Coverage for manila/tests/share/drivers/hitachi/hnas/test_driver.py: 100%

548 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2015 Hitachi Data Systems, Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16from unittest import mock 

17 

18import ddt 

19from oslo_config import cfg 

20 

21from manila import exception 

22import manila.share.configuration 

23import manila.share.driver 

24from manila.share.drivers.hitachi.hnas import driver 

25from manila.share.drivers.hitachi.hnas import ssh 

26from manila import test 

27 

28CONF = cfg.CONF 

29 

30share_nfs = { 

31 'id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a', 

32 'name': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a', 

33 'size': 50, 

34 'host': 'hnas', 

35 'share_proto': 'NFS', 

36 'share_type_id': 1, 

37 'share_network_id': 'bb329e24-3bdb-491d-acfd-dfe70c09b98d', 

38 'share_server_id': 'cc345a53-491d-acfd-3bdb-dfe70c09b98d', 

39 'export_locations': [{'path': '172.24.44.10:/shares/' 

40 'aa4a7710-f326-41fb-ad18-b4ad587fc87a'}], 

41} 

42 

43share_cifs = { 

44 'id': 'f5cadaf2-afbe-4cc4-9021-85491b6b76f7', 

45 'name': 'f5cadaf2-afbe-4cc4-9021-85491b6b76f7', 

46 'size': 50, 

47 'host': 'hnas', 

48 'share_proto': 'CIFS', 

49 'share_type_id': 1, 

50 'share_network_id': 'bb329e24-3bdb-491d-acfd-dfe70c09b98d', 

51 'share_server_id': 'cc345a53-491d-acfd-3bdb-dfe70c09b98d', 

52 'export_locations': [{'path': '\\\\172.24.44.10\\' 

53 'f5cadaf2-afbe-4cc4-9021-85491b6b76f7'}], 

54} 

55 

56share_invalid_host = { 

57 'id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a', 

58 'name': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a', 

59 'size': 50, 

60 'host': 'invalid', 

61 'share_proto': 'NFS', 

62 'share_type_id': 1, 

63 'share_network_id': 'bb329e24-3bdb-491d-acfd-dfe70c09b98d', 

64 'share_server_id': 'cc345a53-491d-acfd-3bdb-dfe70c09b98d', 

65 'export_locations': [{'path': '172.24.44.10:/shares/' 

66 'aa4a7710-f326-41fb-ad18-b4ad587fc87a'}], 

67} 

68 

69share_mount_support_nfs = { 

70 'id': '62125744-fcdd-4f55-a8c1-d1498102f634', 

71 'name': '62125744-fcdd-4f55-a8c1-d1498102f634', 

72 'size': 50, 

73 'host': 'hnas', 

74 'share_proto': 'NFS', 

75 'share_type_id': 1, 

76 'share_network_id': 'bb329e24-3bdb-491d-acfd-dfe70c09b98d', 

77 'share_server_id': 'cc345a53-491d-acfd-3bdb-dfe70c09b98d', 

78 'export_locations': [{'path': '172.24.44.10:/shares/' 

79 '62125744-fcdd-4f55-a8c1-d1498102f634'}], 

80 'mount_snapshot_support': True, 

81} 

82 

83share_mount_support_cifs = { 

84 'id': 'd6e7dc6b-f65f-49d9-968d-936f75474f29', 

85 'name': 'd6e7dc6b-f65f-49d9-968d-936f75474f29', 

86 'size': 50, 

87 'host': 'hnas', 

88 'share_proto': 'CIFS', 

89 'share_type_id': 1, 

90 'share_network_id': 'bb329e24-3bdb-491d-acfd-dfe70c09b98d', 

91 'share_server_id': 'cc345a53-491d-acfd-3bdb-dfe70c09b98d', 

92 'export_locations': [{'path': '172.24.44.10:/shares/' 

93 'd6e7dc6b-f65f-49d9-968d-936f75474f29'}], 

94 'mount_snapshot_support': True, 

95} 

96 

97access_nfs_rw = { 

98 'id': 'acdc7172b-fe07-46c4-b78f-df3e0324ccd0', 

99 'access_type': 'ip', 

100 'access_to': '172.24.44.200', 

101 'access_level': 'rw', 

102 'state': 'active', 

103} 

104 

105access_cifs_rw = { 

106 'id': '43167594-40e9-b899-1f4f-b9c2176b7564', 

107 'access_type': 'user', 

108 'access_to': 'fake_user', 

109 'access_level': 'rw', 

110 'state': 'active', 

111} 

112 

113access_cifs_ro = { 

114 'id': '32407088-1f4f-40e9-b899-b9a4176b574d', 

115 'access_type': 'user', 

116 'access_to': 'fake_user', 

117 'access_level': 'ro', 

118 'state': 'active', 

119} 

120 

121snapshot_nfs = { 

122 'id': 'abba6d9b-f29c-4bf7-aac1-618cda7aaf0f', 

123 'share_id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a', 

124 'share': share_nfs, 

125 'provider_location': '/snapshots/aa4a7710-f326-41fb-ad18-b4ad587fc87a/' 

126 'abba6d9b-f29c-4bf7-aac1-618cda7aaf0f', 

127 'size': 2, 

128} 

129 

130snapshot_cifs = { 

131 'id': '91bc6e1b-1ba5-f29c-abc1-da7618cabf0a', 

132 'share_id': 'f5cadaf2-afbe-4cc4-9021-85491b6b76f7', 

133 'share': share_cifs, 

134 'provider_location': '/snapshots/f5cadaf2-afbe-4cc4-9021-85491b6b76f7/' 

135 '91bc6e1b-1ba5-f29c-abc1-da7618cabf0a', 

136 'size': 2, 

137} 

138 

139manage_snapshot = { 

140 'id': 'bc168eb-fa71-beef-153a-3d451aa1351f', 

141 'share_id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a', 

142 'share': share_nfs, 

143 'provider_location': '/snapshots/aa4a7710-f326-41fb-ad18-b4ad587fc87a' 

144 '/snapshot18-05-2106', 

145} 

146 

147snapshot_mount_support_nfs = { 

148 'id': '3377b015-a695-4a5a-8aa5-9b931b023380', 

149 'share_id': '62125744-fcdd-4f55-a8c1-d1498102f634', 

150 'share': share_mount_support_nfs, 

151 'provider_location': '/snapshots/62125744-fcdd-4f55-a8c1-d1498102f634' 

152 '/3377b015-a695-4a5a-8aa5-9b931b023380', 

153} 

154 

155snapshot_mount_support_cifs = { 

156 'id': 'f9916515-5cb8-4612-afa6-7f2baa74223a', 

157 'share_id': 'd6e7dc6b-f65f-49d9-968d-936f75474f29', 

158 'share': share_mount_support_cifs, 

159 'provider_location': '/snapshots/d6e7dc6b-f65f-49d9-968d-936f75474f29' 

160 '/f9916515-5cb8-4612-afa6-7f2baa74223a', 

161} 

162 

163invalid_share = { 

164 'id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a', 

165 'name': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a', 

166 'size': 100, 

167 'host': 'hnas', 

168 'share_proto': 'HDFS', 

169} 

170 

171invalid_snapshot = { 

172 'id': '24dcdcb5-a582-4bcc-b462-641da143afee', 

173 'share_id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a', 

174 'share': invalid_share, 

175} 

176 

177invalid_access_type = { 

178 'id': 'acdc7172b-fe07-46c4-b78f-df3e0324ccd0', 

179 'access_type': 'cert', 

180 'access_to': 'manila_user', 

181 'access_level': 'rw', 

182 'state': 'active', 

183} 

184 

185invalid_access_level = { 

186 'id': 'acdc7172b-fe07-46c4-b78f-df3e0324ccd0', 

187 'access_type': 'ip', 

188 'access_to': 'manila_user', 

189 'access_level': '777', 

190 'state': 'active', 

191} 

192 

193invalid_protocol_msg = ("Share backend error: Only NFS or CIFS protocol are " 

194 "currently supported. Share provided %(id)s with " 

195 "protocol %(proto)s." % 

196 {'id': invalid_share['id'], 

197 'proto': invalid_share['share_proto']}) 

198 

199 

200@ddt.ddt 

201class HitachiHNASTestCase(test.TestCase): 

202 def setUp(self): 

203 super(HitachiHNASTestCase, self).setUp() 

204 CONF.set_default('driver_handles_share_servers', False) 

205 CONF.hitachi_hnas_evs_id = '2' 

206 CONF.hitachi_hnas_evs_ip = '172.24.44.10' 

207 CONF.hitachi_hnas_admin_network_ip = '10.20.30.40' 

208 CONF.hitachi_hnas_ip = '172.24.44.1' 

209 CONF.hitachi_hnas_ip_port = 'hitachi_hnas_ip_port' 

210 CONF.hitachi_hnas_user = 'hitachi_hnas_user' 

211 CONF.hitachi_hnas_password = 'hitachi_hnas_password' 

212 CONF.hitachi_hnas_file_system_name = 'file_system' 

213 CONF.hitachi_hnas_ssh_private_key = 'private_key' 

214 CONF.hitachi_hnas_cluster_admin_ip0 = None 

215 CONF.hitachi_hnas_stalled_job_timeout = 10 

216 CONF.hitachi_hnas_driver_helper = ('manila.share.drivers.hitachi.hnas.' 

217 'ssh.HNASSSHBackend') 

218 self.fake_conf = manila.share.configuration.Configuration(None) 

219 

220 self.fake_private_storage = mock.Mock() 

221 self.mock_object(self.fake_private_storage, 'get', 

222 mock.Mock(return_value=None)) 

223 self.mock_object(self.fake_private_storage, 'delete', 

224 mock.Mock(return_value=None)) 

225 

226 self._driver = driver.HitachiHNASDriver( 

227 private_storage=self.fake_private_storage, 

228 configuration=self.fake_conf) 

229 self._driver.backend_name = "hnas" 

230 self.mock_log = self.mock_object(driver, 'LOG') 

231 

232 # mocking common backend calls 

233 self.mock_object(ssh.HNASSSHBackend, "check_fs_mounted", mock.Mock( 

234 return_value=True)) 

235 self.mock_object(ssh.HNASSSHBackend, "check_vvol") 

236 self.mock_object(ssh.HNASSSHBackend, "check_quota") 

237 self.mock_object(ssh.HNASSSHBackend, "check_cifs") 

238 self.mock_object(ssh.HNASSSHBackend, "check_export") 

239 self.mock_object(ssh.HNASSSHBackend, 'check_directory') 

240 

241 @ddt.data('hitachi_hnas_driver_helper', 'hitachi_hnas_evs_id', 

242 'hitachi_hnas_evs_ip', 'hitachi_hnas_ip', 'hitachi_hnas_user') 

243 def test_init_invalid_conf_parameters(self, attr_name): 

244 self.mock_object(manila.share.driver.ShareDriver, '__init__') 

245 setattr(CONF, attr_name, None) 

246 

247 self.assertRaises(exception.InvalidParameterValue, 

248 self._driver.__init__) 

249 

250 def test_init_invalid_credentials(self): 

251 self.mock_object(manila.share.driver.ShareDriver, 

252 '__init__') 

253 CONF.hitachi_hnas_password = None 

254 CONF.hitachi_hnas_ssh_private_key = None 

255 

256 self.assertRaises(exception.InvalidParameterValue, 

257 self._driver.__init__) 

258 

259 @ddt.data(True, False) 

260 def test_update_access_nfs(self, empty_rules): 

261 if not empty_rules: 

262 access1 = { 

263 'access_type': 'ip', 

264 'access_to': '172.24.10.10', 

265 'access_level': 'rw' 

266 } 

267 access2 = { 

268 'access_type': 'ip', 

269 'access_to': '188.100.20.10', 

270 'access_level': 'ro' 

271 } 

272 access_list = [access1, access2] 

273 access_list_updated = ( 

274 [access1['access_to'] + '(' + access1['access_level'] + 

275 ',norootsquash)', access2['access_to'] + '(' + 

276 access2['access_level'] + ')', ]) 

277 else: 

278 access_list = [] 

279 access_list_updated = [] 

280 

281 self.mock_object(ssh.HNASSSHBackend, "update_nfs_access_rule", 

282 mock.Mock()) 

283 self._driver.update_access('context', share_nfs, access_list, 

284 [], [], []) 

285 

286 ssh.HNASSSHBackend.update_nfs_access_rule.assert_called_once_with( 

287 access_list_updated, share_id=share_nfs['id']) 

288 self.assertTrue(self.mock_log.debug.called) 

289 

290 def test_update_access_ip_exception(self): 

291 access1 = { 

292 'access_type': 'ip', 

293 'access_to': '188.100.20.10', 

294 'access_level': 'ro' 

295 } 

296 access2 = { 

297 'access_type': 'something', 

298 'access_to': '172.24.10.10', 

299 'access_level': 'rw' 

300 } 

301 access_list = [access1, access2] 

302 

303 self.assertRaises(exception.InvalidShareAccess, 

304 self._driver.update_access, 'context', share_nfs, 

305 access_list, [], [], []) 

306 

307 def test_update_access_not_found_exception(self): 

308 access1 = { 

309 'access_type': 'ip', 

310 'access_to': '188.100.20.10', 

311 'access_level': 'ro' 

312 } 

313 access2 = { 

314 'access_type': 'something', 

315 'access_to': '172.24.10.10', 

316 'access_level': 'rw' 

317 } 

318 access_list = [access1, access2] 

319 

320 self.mock_object(self._driver, '_ensure_share', mock.Mock( 

321 side_effect=exception.HNASItemNotFoundException(msg='fake'))) 

322 

323 self.assertRaises(exception.ShareResourceNotFound, 

324 self._driver.update_access, 'context', share_nfs, 

325 access_list, add_rules=[], delete_rules=[], 

326 update_rules=[]) 

327 

328 @ddt.data([access_cifs_rw, 'acr'], [access_cifs_ro, 'ar']) 

329 @ddt.unpack 

330 def test_allow_access_cifs(self, access_cifs, permission): 

331 access_list_allow = [access_cifs] 

332 

333 self.mock_object(ssh.HNASSSHBackend, 'cifs_allow_access') 

334 

335 self._driver.update_access('context', share_cifs, [], 

336 access_list_allow, [], []) 

337 

338 ssh.HNASSSHBackend.cifs_allow_access.assert_called_once_with( 

339 share_cifs['id'], 'fake_user', permission, is_snapshot=False) 

340 self.assertTrue(self.mock_log.debug.called) 

341 

342 def test_allow_access_cifs_invalid_type(self): 

343 access_cifs_type_ip = { 

344 'id': '43167594-40e9-b899-1f4f-b9c2176b7564', 

345 'access_type': 'ip', 

346 'access_to': 'fake_user', 

347 'access_level': 'rw', 

348 'state': 'active', 

349 } 

350 access_list_allow = [access_cifs_type_ip] 

351 

352 self.assertRaises(exception.InvalidShareAccess, 

353 self._driver.update_access, 'context', share_cifs, 

354 [], access_list_allow, [], []) 

355 

356 def test_deny_access_cifs(self): 

357 access_list_deny = [access_cifs_rw] 

358 

359 self.mock_object(ssh.HNASSSHBackend, 'cifs_deny_access') 

360 

361 self._driver.update_access('context', share_cifs, [], [], 

362 access_list_deny, []) 

363 

364 ssh.HNASSSHBackend.cifs_deny_access.assert_called_once_with( 

365 share_cifs['id'], 'fake_user', is_snapshot=False) 

366 self.assertTrue(self.mock_log.debug.called) 

367 

368 def test_deny_access_cifs_unsupported_type(self): 

369 access_cifs_type_ip = { 

370 'id': '43167594-40e9-b899-1f4f-b9c2176b7564', 

371 'access_type': 'ip', 

372 'access_to': 'fake_user', 

373 'access_level': 'rw', 

374 'state': 'active', 

375 } 

376 access_list_deny = [access_cifs_type_ip] 

377 

378 self.mock_object(ssh.HNASSSHBackend, 'cifs_deny_access') 

379 

380 self._driver.update_access('context', share_cifs, [], [], 

381 access_list_deny, []) 

382 self.assertTrue(self.mock_log.warning.called) 

383 

384 def test_update_access_invalid_share_protocol(self): 

385 self.mock_object(self._driver, '_ensure_share') 

386 ex = self.assertRaises(exception.ShareBackendException, 

387 self._driver.update_access, 'context', 

388 invalid_share, [], [], [], []) 

389 self.assertEqual(invalid_protocol_msg, ex.msg) 

390 

391 def test_update_access_cifs_recovery_mode(self): 

392 access_list = [access_cifs_rw, access_cifs_ro] 

393 permission_list = [('fake_user1', 'acr'), ('fake_user2', 'ar')] 

394 

395 self.mock_object(ssh.HNASSSHBackend, 'list_cifs_permissions', 

396 mock.Mock(return_value=permission_list)) 

397 self.mock_object(ssh.HNASSSHBackend, 'cifs_deny_access') 

398 self.mock_object(ssh.HNASSSHBackend, 'cifs_allow_access') 

399 

400 self._driver.update_access('context', share_cifs, access_list, 

401 [], [], []) 

402 

403 ssh.HNASSSHBackend.list_cifs_permissions.assert_called_once_with( 

404 share_cifs['id']) 

405 self.assertTrue(self.mock_log.debug.called) 

406 

407 def _get_export(self, id, share_proto, ip, is_admin_only, 

408 is_snapshot=False): 

409 if share_proto.lower() == 'nfs': 

410 if is_snapshot: 

411 path = '/snapshots/' + id 

412 else: 

413 path = '/shares/' + id 

414 export = ':'.join((ip, path)) 

415 else: 

416 export = r'\\%s\%s' % (ip, id) 

417 

418 return { 

419 "path": export, 

420 "is_admin_only": is_admin_only, 

421 "metadata": {}, 

422 } 

423 

424 @ddt.data(share_nfs, share_cifs) 

425 def test_create_share(self, share): 

426 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted", 

427 mock.Mock()) 

428 self.mock_object(ssh.HNASSSHBackend, "vvol_create") 

429 self.mock_object(ssh.HNASSSHBackend, "quota_add") 

430 self.mock_object(ssh.HNASSSHBackend, "nfs_export_add", mock.Mock( 

431 return_value='/shares/' + share['id'])) 

432 self.mock_object(ssh.HNASSSHBackend, "cifs_share_add") 

433 

434 result = self._driver.create_share('context', share) 

435 

436 self.assertTrue(self.mock_log.debug.called) 

437 ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share['id']) 

438 ssh.HNASSSHBackend.quota_add.assert_called_once_with(share['id'], 

439 share['size']) 

440 expected = [ 

441 self._get_export( 

442 share['id'], share['share_proto'], self._driver.hnas_evs_ip, 

443 False), 

444 self._get_export( 

445 share['id'], share['share_proto'], 

446 self._driver.hnas_admin_network_ip, True)] 

447 

448 if share['share_proto'].lower() == 'nfs': 

449 ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with( 

450 share_nfs['id'], snapshot_id=None) 

451 self.assertFalse(ssh.HNASSSHBackend.cifs_share_add.called) 

452 else: 

453 ssh.HNASSSHBackend.cifs_share_add.assert_called_once_with( 

454 share_cifs['id'], snapshot_id=None) 

455 self.assertFalse(ssh.HNASSSHBackend.nfs_export_add.called) 

456 self.assertEqual(expected, result) 

457 

458 def test_create_share_export_error(self): 

459 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted", 

460 mock.Mock()) 

461 self.mock_object(ssh.HNASSSHBackend, "vvol_create") 

462 self.mock_object(ssh.HNASSSHBackend, "quota_add") 

463 self.mock_object(ssh.HNASSSHBackend, "nfs_export_add", mock.Mock( 

464 side_effect=exception.HNASBackendException('msg'))) 

465 self.mock_object(ssh.HNASSSHBackend, "vvol_delete") 

466 

467 self.assertRaises(exception.HNASBackendException, 

468 self._driver.create_share, 'context', share_nfs) 

469 self.assertTrue(self.mock_log.debug.called) 

470 ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share_nfs['id']) 

471 ssh.HNASSSHBackend.quota_add.assert_called_once_with(share_nfs['id'], 

472 share_nfs['size']) 

473 ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with( 

474 share_nfs['id'], snapshot_id=None) 

475 ssh.HNASSSHBackend.vvol_delete.assert_called_once_with(share_nfs['id']) 

476 

477 def test_create_share_invalid_share_protocol(self): 

478 self.mock_object(driver.HitachiHNASDriver, "_create_share", 

479 mock.Mock(return_value="path")) 

480 

481 ex = self.assertRaises(exception.ShareBackendException, 

482 self._driver.create_share, 'context', 

483 invalid_share) 

484 self.assertEqual(invalid_protocol_msg, ex.msg) 

485 

486 @ddt.data(share_nfs, share_cifs) 

487 def test_delete_share(self, share): 

488 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted", 

489 mock.Mock()) 

490 self.mock_object(ssh.HNASSSHBackend, "nfs_export_del") 

491 self.mock_object(ssh.HNASSSHBackend, "cifs_share_del") 

492 self.mock_object(ssh.HNASSSHBackend, "vvol_delete") 

493 

494 self._driver.delete_share('context', share) 

495 

496 self.assertTrue(self.mock_log.debug.called) 

497 ssh.HNASSSHBackend.vvol_delete.assert_called_once_with(share['id']) 

498 

499 if share['share_proto'].lower() == 'nfs': 

500 ssh.HNASSSHBackend.nfs_export_del.assert_called_once_with( 

501 share['id']) 

502 self.assertFalse(ssh.HNASSSHBackend.cifs_share_del.called) 

503 else: 

504 ssh.HNASSSHBackend.cifs_share_del.assert_called_once_with( 

505 share['id']) 

506 self.assertFalse(ssh.HNASSSHBackend.nfs_export_del.called) 

507 

508 @ddt.data(snapshot_nfs, snapshot_cifs, snapshot_mount_support_nfs, 

509 snapshot_mount_support_cifs) 

510 def test_create_snapshot(self, snapshot): 

511 hnas_id = snapshot['share_id'] 

512 access_list = ['172.24.44.200(rw,norootsquash)', 

513 '172.24.49.180(all_squash,read_write,secure)', 

514 '172.24.49.110(ro, secure)', 

515 '172.24.49.112(secure,readwrite,norootsquash)', 

516 '172.24.49.142(read_only, secure)', 

517 '172.24.49.201(rw,read_write,readwrite)', 

518 '172.24.49.218(rw)'] 

519 

520 ro_list = ['172.24.44.200(ro,norootsquash)', 

521 '172.24.49.180(all_squash,ro,secure)', 

522 '172.24.49.110(ro, secure)', 

523 '172.24.49.112(secure,ro,norootsquash)', 

524 '172.24.49.142(read_only, secure)', 

525 '172.24.49.201(ro,ro,ro)', 

526 '172.24.49.218(ro)'] 

527 

528 export_locations = [ 

529 self._get_export( 

530 snapshot['id'], snapshot['share']['share_proto'], 

531 self._driver.hnas_evs_ip, False, is_snapshot=True), 

532 self._get_export( 

533 snapshot['id'], snapshot['share']['share_proto'], 

534 self._driver.hnas_admin_network_ip, True, is_snapshot=True)] 

535 

536 expected = {'provider_location': '/snapshots/' + hnas_id + '/' + 

537 snapshot['id']} 

538 

539 if snapshot['share'].get('mount_snapshot_support'): 

540 expected['export_locations'] = export_locations 

541 

542 self.mock_object(ssh.HNASSSHBackend, "get_nfs_host_list", mock.Mock( 

543 return_value=access_list)) 

544 self.mock_object(ssh.HNASSSHBackend, "update_nfs_access_rule", 

545 mock.Mock()) 

546 self.mock_object(ssh.HNASSSHBackend, "is_cifs_in_use", mock.Mock( 

547 return_value=False)) 

548 self.mock_object(ssh.HNASSSHBackend, "tree_clone") 

549 self.mock_object(ssh.HNASSSHBackend, "nfs_export_add") 

550 self.mock_object(ssh.HNASSSHBackend, "cifs_share_add") 

551 

552 out = self._driver.create_snapshot('context', snapshot) 

553 

554 ssh.HNASSSHBackend.tree_clone.assert_called_once_with( 

555 '/shares/' + hnas_id, '/snapshots/' + hnas_id + '/' + 

556 snapshot['id']) 

557 self.assertEqual(expected, out) 

558 

559 if snapshot['share']['share_proto'].lower() == 'nfs': 

560 ssh.HNASSSHBackend.get_nfs_host_list.assert_called_once_with( 

561 hnas_id) 

562 ssh.HNASSSHBackend.update_nfs_access_rule.assert_any_call( 

563 ro_list, share_id=hnas_id) 

564 ssh.HNASSSHBackend.update_nfs_access_rule.assert_any_call( 

565 access_list, share_id=hnas_id) 

566 else: 

567 ssh.HNASSSHBackend.is_cifs_in_use.assert_called_once_with( 

568 hnas_id) 

569 

570 def test_create_snapshot_invalid_protocol(self): 

571 self.mock_object(self._driver, '_ensure_share') 

572 ex = self.assertRaises(exception.ShareBackendException, 

573 self._driver.create_snapshot, 'context', 

574 invalid_snapshot) 

575 self.assertEqual(invalid_protocol_msg, ex.msg) 

576 

577 def test_create_snapshot_cifs_exception(self): 

578 cifs_excep_msg = ("Share backend error: CIFS snapshot when share is " 

579 "mounted is disabled. Set " 

580 "hitachi_hnas_allow_cifs_snapshot_while_mounted to " 

581 "True or unmount the share to take a snapshot.") 

582 

583 self.mock_object(ssh.HNASSSHBackend, "is_cifs_in_use", mock.Mock( 

584 return_value=True)) 

585 

586 ex = self.assertRaises(exception.ShareBackendException, 

587 self._driver.create_snapshot, 'context', 

588 snapshot_cifs) 

589 self.assertEqual(cifs_excep_msg, ex.msg) 

590 

591 def test_create_snapshot_first_snapshot(self): 

592 hnas_id = snapshot_nfs['share_id'] 

593 self.mock_object(ssh.HNASSSHBackend, "get_nfs_host_list", mock.Mock( 

594 return_value=['172.24.44.200(rw)'])) 

595 self.mock_object(ssh.HNASSSHBackend, "update_nfs_access_rule", 

596 mock.Mock()) 

597 self.mock_object(ssh.HNASSSHBackend, "tree_clone", mock.Mock( 

598 side_effect=exception.HNASNothingToCloneException('msg'))) 

599 self.mock_object(ssh.HNASSSHBackend, "create_directory") 

600 self.mock_object(ssh.HNASSSHBackend, "nfs_export_add") 

601 self.mock_object(ssh.HNASSSHBackend, "cifs_share_add") 

602 

603 self._driver.create_snapshot('context', snapshot_nfs) 

604 

605 self.assertTrue(self.mock_log.warning.called) 

606 ssh.HNASSSHBackend.get_nfs_host_list.assert_called_once_with( 

607 hnas_id) 

608 ssh.HNASSSHBackend.update_nfs_access_rule.assert_any_call( 

609 ['172.24.44.200(ro)'], share_id=hnas_id) 

610 ssh.HNASSSHBackend.update_nfs_access_rule.assert_any_call( 

611 ['172.24.44.200(rw)'], share_id=hnas_id) 

612 ssh.HNASSSHBackend.create_directory.assert_called_once_with( 

613 '/snapshots/' + hnas_id + '/' + snapshot_nfs['id']) 

614 

615 @ddt.data(snapshot_nfs, snapshot_cifs, 

616 snapshot_mount_support_nfs, snapshot_mount_support_cifs) 

617 def test_delete_snapshot(self, snapshot): 

618 hnas_share_id = snapshot['share_id'] 

619 hnas_snapshot_id = snapshot['id'] 

620 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted") 

621 self.mock_object(ssh.HNASSSHBackend, "tree_delete") 

622 self.mock_object(ssh.HNASSSHBackend, "delete_directory") 

623 self.mock_object(ssh.HNASSSHBackend, "nfs_export_del") 

624 self.mock_object(ssh.HNASSSHBackend, "cifs_share_del") 

625 

626 self._driver.delete_snapshot('context', snapshot) 

627 

628 self.assertTrue(self.mock_log.debug.called) 

629 self.assertTrue(self.mock_log.info.called) 

630 driver.HitachiHNASDriver._check_fs_mounted.assert_called_once_with() 

631 ssh.HNASSSHBackend.tree_delete.assert_called_once_with( 

632 '/snapshots/' + hnas_share_id + '/' + snapshot['id']) 

633 ssh.HNASSSHBackend.delete_directory.assert_called_once_with( 

634 '/snapshots/' + hnas_share_id) 

635 if snapshot['share']['share_proto'].lower() == 'nfs': 

636 if snapshot['share'].get('mount_snapshot_support'): 

637 ssh.HNASSSHBackend.nfs_export_del.assert_called_once_with( 

638 snapshot_id=hnas_snapshot_id) 

639 else: 

640 ssh.HNASSSHBackend.nfs_export_del.assert_not_called() 

641 else: 

642 if snapshot['share'].get('mount_snapshot_support'): 

643 ssh.HNASSSHBackend.cifs_share_del.assert_called_once_with( 

644 hnas_snapshot_id) 

645 else: 

646 ssh.HNASSSHBackend.cifs_share_del.assert_not_called() 

647 

648 def test_delete_managed_snapshot(self): 

649 hnas_id = manage_snapshot['share_id'] 

650 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted") 

651 self.mock_object(ssh.HNASSSHBackend, "tree_delete") 

652 self.mock_object(ssh.HNASSSHBackend, "delete_directory") 

653 self.mock_object(ssh.HNASSSHBackend, "nfs_export_del") 

654 self.mock_object(ssh.HNASSSHBackend, "cifs_share_del") 

655 

656 self._driver.delete_snapshot('context', manage_snapshot) 

657 

658 self.assertTrue(self.mock_log.debug.called) 

659 self.assertTrue(self.mock_log.info.called) 

660 driver.HitachiHNASDriver._check_fs_mounted.assert_called_once_with() 

661 ssh.HNASSSHBackend.tree_delete.assert_called_once_with( 

662 manage_snapshot['provider_location']) 

663 ssh.HNASSSHBackend.delete_directory.assert_called_once_with( 

664 '/snapshots/' + hnas_id) 

665 

666 @ddt.data(share_nfs, share_cifs) 

667 def test_ensure_share(self, share): 

668 result = self._driver.ensure_share('context', share) 

669 

670 ssh.HNASSSHBackend.check_vvol.assert_called_once_with(share['id']) 

671 ssh.HNASSSHBackend.check_quota.assert_called_once_with(share['id']) 

672 

673 expected = [ 

674 self._get_export( 

675 share['id'], share['share_proto'], self._driver.hnas_evs_ip, 

676 False), 

677 self._get_export( 

678 share['id'], share['share_proto'], 

679 self._driver.hnas_admin_network_ip, True)] 

680 

681 if share['share_proto'].lower() == 'nfs': 

682 ssh.HNASSSHBackend.check_export.assert_called_once_with( 

683 share['id']) 

684 self.assertFalse(ssh.HNASSSHBackend.check_cifs.called) 

685 else: 

686 ssh.HNASSSHBackend.check_cifs.assert_called_once_with(share['id']) 

687 self.assertFalse(ssh.HNASSSHBackend.check_export.called) 

688 self.assertEqual(expected, result) 

689 

690 def test_ensure_share_invalid_protocol(self): 

691 ex = self.assertRaises(exception.ShareBackendException, 

692 self._driver.ensure_share, 'context', 

693 invalid_share) 

694 

695 self.assertEqual(invalid_protocol_msg, ex.msg) 

696 

697 def test_shrink_share(self): 

698 self.mock_object(ssh.HNASSSHBackend, "get_share_usage", mock.Mock( 

699 return_value=10)) 

700 self.mock_object(ssh.HNASSSHBackend, "modify_quota") 

701 

702 self._driver.shrink_share(share_nfs, 11) 

703 

704 ssh.HNASSSHBackend.get_share_usage.assert_called_once_with( 

705 share_nfs['id']) 

706 ssh.HNASSSHBackend.modify_quota.assert_called_once_with( 

707 share_nfs['id'], 11) 

708 

709 def test_shrink_share_new_size_lower_than_usage(self): 

710 self.mock_object(ssh.HNASSSHBackend, "get_share_usage", mock.Mock( 

711 return_value=10)) 

712 

713 self.assertRaises(exception.ShareShrinkingPossibleDataLoss, 

714 self._driver.shrink_share, share_nfs, 9) 

715 ssh.HNASSSHBackend.get_share_usage.assert_called_once_with( 

716 share_nfs['id']) 

717 

718 def test_extend_share(self): 

719 self.mock_object(ssh.HNASSSHBackend, "get_stats", mock.Mock( 

720 return_value=(500, 200, True))) 

721 self.mock_object(ssh.HNASSSHBackend, "modify_quota") 

722 

723 self._driver.extend_share(share_nfs, 150) 

724 

725 ssh.HNASSSHBackend.get_stats.assert_called_once_with() 

726 ssh.HNASSSHBackend.modify_quota.assert_called_once_with( 

727 share_nfs['id'], 150) 

728 

729 def test_extend_share_with_no_available_space_in_fs(self): 

730 self.mock_object(ssh.HNASSSHBackend, "get_stats", mock.Mock( 

731 return_value=(500, 200, False))) 

732 self.mock_object(ssh.HNASSSHBackend, "modify_quota") 

733 

734 self.assertRaises(exception.HNASBackendException, 

735 self._driver.extend_share, share_nfs, 1000) 

736 ssh.HNASSSHBackend.get_stats.assert_called_once_with() 

737 

738 @ddt.data(share_nfs, share_cifs) 

739 def test_manage_existing(self, share): 

740 

741 expected_exports = [ 

742 self._get_export( 

743 share['id'], share['share_proto'], self._driver.hnas_evs_ip, 

744 False), 

745 self._get_export( 

746 share['id'], share['share_proto'], 

747 self._driver.hnas_admin_network_ip, True)] 

748 

749 expected_out = {'size': share['size'], 

750 'export_locations': expected_exports} 

751 

752 self.mock_object(ssh.HNASSSHBackend, "get_share_quota", mock.Mock( 

753 return_value=share['size'])) 

754 

755 out = self._driver.manage_existing(share, 'option') 

756 

757 self.assertEqual(expected_out, out) 

758 ssh.HNASSSHBackend.get_share_quota.assert_called_once_with( 

759 share['id']) 

760 

761 def test_manage_existing_no_quota(self): 

762 self.mock_object(ssh.HNASSSHBackend, "get_share_quota", mock.Mock( 

763 return_value=None)) 

764 

765 self.assertRaises(exception.ManageInvalidShare, 

766 self._driver.manage_existing, share_nfs, 'option') 

767 ssh.HNASSSHBackend.get_share_quota.assert_called_once_with( 

768 share_nfs['id']) 

769 

770 def test_manage_existing_wrong_share_id(self): 

771 self.mock_object(self.fake_private_storage, 'get', 

772 mock.Mock(return_value='Wrong_share_id')) 

773 

774 self.assertRaises(exception.HNASBackendException, 

775 self._driver.manage_existing, share_nfs, 'option') 

776 

777 @ddt.data(':/', '1.1.1.1:/share_id', '1.1.1.1:/shares', 

778 '1.1.1.1:shares/share_id', ':/share_id') 

779 def test_manage_existing_wrong_path_format_nfs(self, wrong_location): 

780 expected_exception = ("Share backend error: Incorrect path. It " 

781 "should have the following format: " 

782 "IP:/shares/share_id.") 

783 

784 self._test_manage_existing_wrong_path( 

785 share_nfs.copy(), expected_exception, wrong_location) 

786 

787 @ddt.data('\\\\1.1.1.1', '1.1.1.1\\share_id', '1.1.1.1\\shares\\share_id', 

788 '\\\\1.1.1.1\\shares\\share_id', '\\\\share_id') 

789 def test_manage_existing_wrong_path_format_cifs(self, wrong_location): 

790 expected_exception = ("Share backend error: Incorrect path. It should " 

791 "have the following format: \\\\IP\\share_id.") 

792 

793 self._test_manage_existing_wrong_path( 

794 share_cifs.copy(), expected_exception, wrong_location) 

795 

796 def _test_manage_existing_wrong_path( 

797 self, share, expected_exception, wrong_location): 

798 

799 share['export_locations'] = [{'path': wrong_location}] 

800 

801 ex = self.assertRaises(exception.ShareBackendException, 

802 self._driver.manage_existing, share, 'option') 

803 

804 self.assertEqual(expected_exception, ex.msg) 

805 

806 def test_manage_existing_wrong_evs_ip(self): 

807 share_nfs['export_locations'] = [{'path': '172.24.44.189:/shares/' 

808 'aa4a7710-f326-41fb-ad18-'}] 

809 

810 self.assertRaises(exception.ShareBackendException, 

811 self._driver.manage_existing, share_nfs, 

812 'option') 

813 

814 def test_manage_existing_invalid_host(self): 

815 self.assertRaises(exception.ShareBackendException, 

816 self._driver.manage_existing, share_invalid_host, 

817 'option') 

818 

819 def test_manage_existing_invalid_protocol(self): 

820 self.assertRaises(exception.ShareBackendException, 

821 self._driver.manage_existing, invalid_share, 

822 'option') 

823 

824 @ddt.data(True, False) 

825 def test_unmanage(self, has_export_locations): 

826 share_copy = share_nfs.copy() 

827 if not has_export_locations: 

828 share_copy['export_locations'] = [] 

829 

830 self._driver.unmanage(share_copy) 

831 

832 self.assertTrue(self.fake_private_storage.delete.called) 

833 self.assertTrue(self.mock_log.info.called) 

834 

835 def test_get_network_allocations_number(self): 

836 result = self._driver.get_network_allocations_number() 

837 

838 self.assertEqual(0, result) 

839 

840 @ddt.data([share_nfs, snapshot_nfs], [share_cifs, snapshot_cifs]) 

841 @ddt.unpack 

842 def test_create_share_from_snapshot(self, share, snapshot): 

843 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted", 

844 mock.Mock()) 

845 self.mock_object(ssh.HNASSSHBackend, "vvol_create") 

846 self.mock_object(ssh.HNASSSHBackend, "quota_add") 

847 self.mock_object(ssh.HNASSSHBackend, "tree_clone") 

848 self.mock_object(ssh.HNASSSHBackend, "cifs_share_add") 

849 self.mock_object(ssh.HNASSSHBackend, "nfs_export_add") 

850 

851 result = self._driver.create_share_from_snapshot('context', 

852 share, 

853 snapshot) 

854 

855 ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share['id']) 

856 ssh.HNASSSHBackend.quota_add.assert_called_once_with(share['id'], 

857 share['size']) 

858 ssh.HNASSSHBackend.tree_clone.assert_called_once_with( 

859 '/snapshots/' + share['id'] + '/' + snapshot['id'], 

860 '/shares/' + share['id']) 

861 

862 expected = [ 

863 self._get_export( 

864 share['id'], share['share_proto'], self._driver.hnas_evs_ip, 

865 False), 

866 self._get_export( 

867 share['id'], share['share_proto'], 

868 self._driver.hnas_admin_network_ip, True)] 

869 

870 if share['share_proto'].lower() == 'nfs': 

871 ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with( 

872 share['id']) 

873 self.assertFalse(ssh.HNASSSHBackend.cifs_share_add.called) 

874 else: 

875 ssh.HNASSSHBackend.cifs_share_add.assert_called_once_with( 

876 share['id']) 

877 self.assertFalse(ssh.HNASSSHBackend.nfs_export_add.called) 

878 

879 self.assertEqual(expected, result) 

880 

881 def test_create_share_from_snapshot_empty_snapshot(self): 

882 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted", 

883 mock.Mock()) 

884 self.mock_object(ssh.HNASSSHBackend, "vvol_create") 

885 self.mock_object(ssh.HNASSSHBackend, "quota_add") 

886 self.mock_object(ssh.HNASSSHBackend, "tree_clone", mock.Mock( 

887 side_effect=exception.HNASNothingToCloneException('msg'))) 

888 self.mock_object(ssh.HNASSSHBackend, "nfs_export_add") 

889 

890 result = self._driver.create_share_from_snapshot('context', share_nfs, 

891 snapshot_nfs) 

892 expected = [ 

893 self._get_export( 

894 share_nfs['id'], share_nfs['share_proto'], 

895 self._driver.hnas_evs_ip, False), 

896 self._get_export( 

897 share_nfs['id'], share_nfs['share_proto'], 

898 self._driver.hnas_admin_network_ip, True)] 

899 

900 self.assertEqual(expected, result) 

901 self.assertTrue(self.mock_log.warning.called) 

902 ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share_nfs['id']) 

903 ssh.HNASSSHBackend.quota_add.assert_called_once_with(share_nfs['id'], 

904 share_nfs['size']) 

905 ssh.HNASSSHBackend.tree_clone.assert_called_once_with( 

906 '/snapshots/' + share_nfs['id'] + '/' + snapshot_nfs['id'], 

907 '/shares/' + share_nfs['id']) 

908 ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with( 

909 share_nfs['id']) 

910 

911 def test_create_share_from_snapshot_invalid_protocol(self): 

912 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted", 

913 mock.Mock()) 

914 self.mock_object(ssh.HNASSSHBackend, "vvol_create") 

915 self.mock_object(ssh.HNASSSHBackend, "quota_add") 

916 self.mock_object(ssh.HNASSSHBackend, "tree_clone") 

917 

918 ex = self.assertRaises(exception.ShareBackendException, 

919 self._driver.create_share_from_snapshot, 

920 'context', invalid_share, snapshot_nfs) 

921 self.assertEqual(invalid_protocol_msg, ex.msg) 

922 

923 def test_create_share_from_snapshot_cleanup(self): 

924 dest_path = '/snapshots/' + share_nfs['id'] + '/' + snapshot_nfs['id'] 

925 src_path = '/shares/' + share_nfs['id'] 

926 

927 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted", 

928 mock.Mock()) 

929 self.mock_object(ssh.HNASSSHBackend, "vvol_create") 

930 self.mock_object(ssh.HNASSSHBackend, "quota_add") 

931 self.mock_object(ssh.HNASSSHBackend, "tree_clone") 

932 self.mock_object(ssh.HNASSSHBackend, "vvol_delete") 

933 self.mock_object(ssh.HNASSSHBackend, "nfs_export_add", mock.Mock( 

934 side_effect=exception.HNASBackendException( 

935 msg='Error adding nfs export.'))) 

936 

937 self.assertRaises(exception.HNASBackendException, 

938 self._driver.create_share_from_snapshot, 

939 'context', share_nfs, snapshot_nfs) 

940 

941 ssh.HNASSSHBackend.vvol_create.assert_called_once_with( 

942 share_nfs['id']) 

943 ssh.HNASSSHBackend.quota_add.assert_called_once_with( 

944 share_nfs['id'], share_nfs['size']) 

945 ssh.HNASSSHBackend.tree_clone.assert_called_once_with( 

946 dest_path, src_path) 

947 ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with( 

948 share_nfs['id']) 

949 ssh.HNASSSHBackend.vvol_delete.assert_called_once_with( 

950 share_nfs['id']) 

951 

952 def test__check_fs_mounted(self): 

953 self._driver._check_fs_mounted() 

954 

955 ssh.HNASSSHBackend.check_fs_mounted.assert_called_once_with() 

956 

957 def test__check_fs_mounted_not_mounted(self): 

958 self.mock_object(ssh.HNASSSHBackend, 'check_fs_mounted', mock.Mock( 

959 return_value=False)) 

960 

961 self.assertRaises(exception.HNASBackendException, 

962 self._driver._check_fs_mounted) 

963 

964 ssh.HNASSSHBackend.check_fs_mounted.assert_called_once_with() 

965 

966 def test__update_share_stats(self): 

967 fake_data = { 

968 'share_backend_name': self._driver.backend_name, 

969 'driver_handles_share_servers': 

970 self._driver.driver_handles_share_servers, 

971 'vendor_name': 'Hitachi', 

972 'driver_version': '4.0.0', 

973 'storage_protocol': 'NFS_CIFS', 

974 'total_capacity_gb': 1000, 

975 'free_capacity_gb': 200, 

976 'reserved_percentage': driver.CONF.reserved_share_percentage, 

977 'reserved_snapshot_percentage': 

978 driver.CONF.reserved_share_from_snapshot_percentage, 

979 'reserved_share_extend_percentage': 

980 driver.CONF.reserved_share_extend_percentage, 

981 'qos': False, 

982 'thin_provisioning': True, 

983 'dedupe': True, 

984 'revert_to_snapshot_support': True, 

985 'mount_snapshot_support': True, 

986 } 

987 

988 self.mock_object(ssh.HNASSSHBackend, 'get_stats', mock.Mock( 

989 return_value=(1000, 200, True))) 

990 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted", 

991 mock.Mock()) 

992 self.mock_object(manila.share.driver.ShareDriver, 

993 '_update_share_stats') 

994 

995 self._driver._update_share_stats() 

996 

997 self.assertTrue(self._driver.hnas.get_stats.called) 

998 (manila.share.driver.ShareDriver._update_share_stats. 

999 assert_called_once_with(fake_data)) 

1000 self.assertTrue(self.mock_log.info.called) 

1001 

1002 @ddt.data(snapshot_nfs, snapshot_cifs, 

1003 snapshot_mount_support_nfs, snapshot_mount_support_cifs) 

1004 def test_ensure_snapshot(self, snapshot): 

1005 result = self._driver.ensure_snapshot('context', snapshot) 

1006 

1007 if snapshot['share'].get('mount_snapshot_support'): 

1008 expected = [ 

1009 self._get_export( 

1010 snapshot['id'], snapshot['share']['share_proto'], 

1011 self._driver.hnas_evs_ip, False, is_snapshot=True), 

1012 self._get_export( 

1013 snapshot['id'], snapshot['share']['share_proto'], 

1014 self._driver.hnas_admin_network_ip, True, 

1015 is_snapshot=True)] 

1016 

1017 if snapshot['share']['share_proto'].lower() == 'nfs': 

1018 ssh.HNASSSHBackend.check_export.assert_called_once_with( 

1019 snapshot['id'], is_snapshot=True) 

1020 self.assertFalse(ssh.HNASSSHBackend.check_cifs.called) 

1021 else: 

1022 ssh.HNASSSHBackend.check_cifs.assert_called_once_with( 

1023 snapshot['id']) 

1024 self.assertFalse(ssh.HNASSSHBackend.check_export.called) 

1025 else: 

1026 expected = None 

1027 

1028 ssh.HNASSSHBackend.check_directory.assert_called_once_with( 

1029 snapshot['provider_location']) 

1030 self.assertEqual(expected, result) 

1031 

1032 def test_manage_existing_snapshot(self): 

1033 self.mock_object(ssh.HNASSSHBackend, 'check_directory', 

1034 mock.Mock(return_value=True)) 

1035 self.mock_object(self._driver, '_ensure_snapshot', 

1036 mock.Mock(return_value=[])) 

1037 

1038 path_info = manage_snapshot['provider_location'].split('/') 

1039 hnas_snapshot_id = path_info[3] 

1040 

1041 out = self._driver.manage_existing_snapshot(manage_snapshot, 

1042 {'size': 20}) 

1043 

1044 ssh.HNASSSHBackend.check_directory.assert_called_with( 

1045 '/snapshots/aa4a7710-f326-41fb-ad18-b4ad587fc87a' 

1046 '/snapshot18-05-2106') 

1047 self._driver._ensure_snapshot.assert_called_with( 

1048 manage_snapshot, 

1049 hnas_snapshot_id) 

1050 self.assertEqual(20, out['size']) 

1051 self.assertTrue(self.mock_log.debug.called) 

1052 self.assertTrue(self.mock_log.info.called) 

1053 

1054 @ddt.data(None, exception.HNASItemNotFoundException('Fake error.')) 

1055 def test_manage_existing_snapshot_with_mount_support(self, exc): 

1056 export_locations = [{ 

1057 'path': '172.24.44.10:/snapshots/' 

1058 '3377b015-a695-4a5a-8aa5-9b931b023380'}] 

1059 

1060 self.mock_object(ssh.HNASSSHBackend, 'check_directory', 

1061 mock.Mock(return_value=True)) 

1062 self.mock_object(self._driver, '_ensure_snapshot', 

1063 mock.Mock(return_value=[], side_effect=exc)) 

1064 self.mock_object(self._driver, '_get_export_locations', 

1065 mock.Mock(return_value=export_locations)) 

1066 if exc: 

1067 self.mock_object(self._driver, '_create_export') 

1068 

1069 path_info = snapshot_mount_support_nfs['provider_location'].split('/') 

1070 hnas_snapshot_id = path_info[3] 

1071 

1072 out = self._driver.manage_existing_snapshot( 

1073 snapshot_mount_support_nfs, 

1074 {'size': 20, 'export_locations': export_locations}) 

1075 

1076 ssh.HNASSSHBackend.check_directory.assert_called_with( 

1077 '/snapshots/62125744-fcdd-4f55-a8c1-d1498102f634' 

1078 '/3377b015-a695-4a5a-8aa5-9b931b023380') 

1079 self._driver._ensure_snapshot.assert_called_with( 

1080 snapshot_mount_support_nfs, 

1081 hnas_snapshot_id) 

1082 self._driver._get_export_locations.assert_called_with( 

1083 snapshot_mount_support_nfs['share']['share_proto'], 

1084 hnas_snapshot_id, 

1085 is_snapshot=True) 

1086 if exc: 

1087 self._driver._create_export.assert_called_with( 

1088 snapshot_mount_support_nfs['share_id'], 

1089 snapshot_mount_support_nfs['share']['share_proto'], 

1090 snapshot_id=hnas_snapshot_id) 

1091 self.assertEqual(20, out['size']) 

1092 self.assertEqual(export_locations, out['export_locations']) 

1093 self.assertTrue(self.mock_log.debug.called) 

1094 self.assertTrue(self.mock_log.info.called) 

1095 

1096 @ddt.data('fake_size', '128GB', '512 GB', {'size': 128}) 

1097 def test_manage_snapshot_invalid_size_exception(self, size): 

1098 self.assertRaises(exception.ManageInvalidShareSnapshot, 

1099 self._driver.manage_existing_snapshot, 

1100 manage_snapshot, {'size': size}) 

1101 

1102 def test_manage_snapshot_size_not_provided_exception(self): 

1103 self.assertRaises(exception.ManageInvalidShareSnapshot, 

1104 self._driver.manage_existing_snapshot, 

1105 manage_snapshot, {}) 

1106 

1107 @ddt.data('/root/snapshot_id', '/snapshots/share1/snapshot_id', 

1108 '/directory1', 'snapshots/share1/snapshot_id') 

1109 def test_manage_snapshot_invalid_path_exception(self, path): 

1110 snap_copy = manage_snapshot.copy() 

1111 snap_copy['provider_location'] = path 

1112 

1113 self.assertRaises(exception.ManageInvalidShareSnapshot, 

1114 self._driver.manage_existing_snapshot, 

1115 snap_copy, {'size': 20}) 

1116 self.assertTrue(self.mock_log.debug.called) 

1117 

1118 def test_manage_inexistent_snapshot_exception(self): 

1119 self.mock_object(ssh.HNASSSHBackend, 'check_directory', 

1120 mock.Mock(return_value=False)) 

1121 

1122 self.assertRaises(exception.ManageInvalidShareSnapshot, 

1123 self._driver.manage_existing_snapshot, 

1124 manage_snapshot, {'size': 20}) 

1125 self.assertTrue(self.mock_log.debug.called) 

1126 

1127 def test_unmanage_snapshot(self): 

1128 self._driver.unmanage_snapshot(snapshot_nfs) 

1129 self.assertTrue(self.mock_log.info.called) 

1130 

1131 @ddt.data({'snap': snapshot_nfs, 'exc': None}, 

1132 {'snap': snapshot_cifs, 'exc': None}, 

1133 {'snap': snapshot_nfs, 

1134 'exc': exception.HNASNothingToCloneException('fake')}, 

1135 {'snap': snapshot_cifs, 

1136 'exc': exception.HNASNothingToCloneException('fake')}) 

1137 @ddt.unpack 

1138 def test_revert_to_snapshot(self, exc, snap): 

1139 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted") 

1140 self.mock_object(ssh.HNASSSHBackend, 'tree_delete') 

1141 self.mock_object(ssh.HNASSSHBackend, 'vvol_create') 

1142 self.mock_object(ssh.HNASSSHBackend, 'quota_add') 

1143 self.mock_object(ssh.HNASSSHBackend, 'tree_clone', 

1144 mock.Mock(side_effect=exc)) 

1145 

1146 self._driver.revert_to_snapshot('context', snap, None, None) 

1147 

1148 driver.HitachiHNASDriver._check_fs_mounted.assert_called_once_with() 

1149 ssh.HNASSSHBackend.tree_delete.assert_called_once_with( 

1150 '/'.join(('/shares', snap['share_id']))) 

1151 ssh.HNASSSHBackend.vvol_create.assert_called_once_with( 

1152 snap['share_id']) 

1153 ssh.HNASSSHBackend.quota_add.assert_called_once_with( 

1154 snap['share_id'], 2) 

1155 ssh.HNASSSHBackend.tree_clone.assert_called_once_with( 

1156 '/'.join(('/snapshots', snap['share_id'], snap['id'])), 

1157 '/'.join(('/shares', snap['share_id']))) 

1158 ssh.HNASSSHBackend.check_directory.assert_called_once_with( 

1159 snap['provider_location']) 

1160 

1161 if exc: 

1162 self.assertTrue(self.mock_log.warning.called) 

1163 self.assertTrue(self.mock_log.info.called) 

1164 

1165 def test_nfs_snapshot_update_access_allow(self): 

1166 access1 = { 

1167 'access_type': 'ip', 

1168 'access_to': '172.24.10.10', 

1169 } 

1170 access2 = { 

1171 'access_type': 'ip', 

1172 'access_to': '172.31.20.20', 

1173 } 

1174 access_list = [access1, access2] 

1175 

1176 self.mock_object(ssh.HNASSSHBackend, "update_nfs_access_rule") 

1177 

1178 self._driver.snapshot_update_access('ctxt', snapshot_nfs, access_list, 

1179 access_list, []) 

1180 

1181 ssh.HNASSSHBackend.update_nfs_access_rule.assert_called_once_with( 

1182 [access1['access_to'] + '(ro)', access2['access_to'] + '(ro)'], 

1183 snapshot_id=snapshot_nfs['id']) 

1184 ssh.HNASSSHBackend.check_directory.assert_called_once_with( 

1185 snapshot_nfs['provider_location']) 

1186 self.assertTrue(self.mock_log.debug.called) 

1187 

1188 def test_nfs_snapshot_update_access_deny(self): 

1189 access1 = { 

1190 'access_type': 'ip', 

1191 'access_to': '172.24.10.10', 

1192 } 

1193 

1194 self.mock_object(ssh.HNASSSHBackend, "update_nfs_access_rule") 

1195 

1196 self._driver.snapshot_update_access('ctxt', snapshot_nfs, [], 

1197 [], [access1]) 

1198 

1199 ssh.HNASSSHBackend.update_nfs_access_rule.assert_called_once_with( 

1200 [], snapshot_id=snapshot_nfs['id']) 

1201 ssh.HNASSSHBackend.check_directory.assert_called_once_with( 

1202 snapshot_nfs['provider_location']) 

1203 self.assertTrue(self.mock_log.debug.called) 

1204 

1205 def test_nfs_snapshot_update_access_invalid_access_type(self): 

1206 access1 = { 

1207 'access_type': 'user', 

1208 'access_to': 'user1', 

1209 } 

1210 

1211 self.assertRaises(exception.InvalidSnapshotAccess, 

1212 self._driver.snapshot_update_access, 'ctxt', 

1213 snapshot_nfs, [access1], [], []) 

1214 ssh.HNASSSHBackend.check_directory.assert_called_once_with( 

1215 snapshot_nfs['provider_location']) 

1216 

1217 def test_cifs_snapshot_update_access_allow(self): 

1218 access1 = { 

1219 'access_type': 'user', 

1220 'access_to': 'fake_user1', 

1221 } 

1222 

1223 self.mock_object(ssh.HNASSSHBackend, 'cifs_allow_access') 

1224 

1225 self._driver.snapshot_update_access('ctxt', snapshot_cifs, [access1], 

1226 [access1], []) 

1227 

1228 ssh.HNASSSHBackend.cifs_allow_access.assert_called_with( 

1229 snapshot_cifs['id'], access1['access_to'], 'ar', is_snapshot=True) 

1230 ssh.HNASSSHBackend.check_directory.assert_called_once_with( 

1231 snapshot_cifs['provider_location']) 

1232 self.assertTrue(self.mock_log.debug.called) 

1233 

1234 def test_cifs_snapshot_update_access_deny(self): 

1235 access1 = { 

1236 'access_type': 'user', 

1237 'access_to': 'fake_user1', 

1238 } 

1239 

1240 self.mock_object(ssh.HNASSSHBackend, 'cifs_deny_access') 

1241 

1242 self._driver.snapshot_update_access('ctxt', snapshot_cifs, [], [], 

1243 [access1]) 

1244 

1245 ssh.HNASSSHBackend.cifs_deny_access.assert_called_with( 

1246 snapshot_cifs['id'], access1['access_to'], is_snapshot=True) 

1247 ssh.HNASSSHBackend.check_directory.assert_called_once_with( 

1248 snapshot_cifs['provider_location']) 

1249 self.assertTrue(self.mock_log.debug.called) 

1250 

1251 def test_cifs_snapshot_update_access_recovery_mode(self): 

1252 access1 = { 

1253 'access_type': 'user', 

1254 'access_to': 'fake_user1', 

1255 } 

1256 access2 = { 

1257 'access_type': 'user', 

1258 'access_to': 'HDS\\fake_user2', 

1259 } 

1260 access_list = [access1, access2] 

1261 permission_list = [('fake_user1', 'ar'), ('HDS\\fake_user2', 'ar')] 

1262 formatted_user = r'"\{1}{0}\{1}"'.format(access2['access_to'], '"') 

1263 

1264 self.mock_object(ssh.HNASSSHBackend, 'list_cifs_permissions', 

1265 mock.Mock(return_value=permission_list)) 

1266 self.mock_object(ssh.HNASSSHBackend, 'cifs_deny_access') 

1267 self.mock_object(ssh.HNASSSHBackend, 'cifs_allow_access') 

1268 

1269 self._driver.snapshot_update_access('ctxt', snapshot_cifs, access_list, 

1270 [], []) 

1271 

1272 ssh.HNASSSHBackend.list_cifs_permissions.assert_called_once_with( 

1273 snapshot_cifs['id']) 

1274 ssh.HNASSSHBackend.cifs_deny_access.assert_called_with( 

1275 snapshot_cifs['id'], formatted_user, is_snapshot=True) 

1276 ssh.HNASSSHBackend.cifs_allow_access.assert_called_with( 

1277 snapshot_cifs['id'], access2['access_to'].replace('\\', '\\\\'), 

1278 'ar', is_snapshot=True) 

1279 ssh.HNASSSHBackend.check_directory.assert_called_once_with( 

1280 snapshot_cifs['provider_location']) 

1281 self.assertTrue(self.mock_log.debug.called)