Coverage for manila/tests/share/drivers/hitachi/hnas/test_driver.py: 100%
548 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2015 Hitachi Data Systems, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16from unittest import mock
18import ddt
19from oslo_config import cfg
21from manila import exception
22import manila.share.configuration
23import manila.share.driver
24from manila.share.drivers.hitachi.hnas import driver
25from manila.share.drivers.hitachi.hnas import ssh
26from manila import test
28CONF = cfg.CONF
30share_nfs = {
31 'id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
32 'name': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
33 'size': 50,
34 'host': 'hnas',
35 'share_proto': 'NFS',
36 'share_type_id': 1,
37 'share_network_id': 'bb329e24-3bdb-491d-acfd-dfe70c09b98d',
38 'share_server_id': 'cc345a53-491d-acfd-3bdb-dfe70c09b98d',
39 'export_locations': [{'path': '172.24.44.10:/shares/'
40 'aa4a7710-f326-41fb-ad18-b4ad587fc87a'}],
41}
43share_cifs = {
44 'id': 'f5cadaf2-afbe-4cc4-9021-85491b6b76f7',
45 'name': 'f5cadaf2-afbe-4cc4-9021-85491b6b76f7',
46 'size': 50,
47 'host': 'hnas',
48 'share_proto': 'CIFS',
49 'share_type_id': 1,
50 'share_network_id': 'bb329e24-3bdb-491d-acfd-dfe70c09b98d',
51 'share_server_id': 'cc345a53-491d-acfd-3bdb-dfe70c09b98d',
52 'export_locations': [{'path': '\\\\172.24.44.10\\'
53 'f5cadaf2-afbe-4cc4-9021-85491b6b76f7'}],
54}
56share_invalid_host = {
57 'id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
58 'name': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
59 'size': 50,
60 'host': 'invalid',
61 'share_proto': 'NFS',
62 'share_type_id': 1,
63 'share_network_id': 'bb329e24-3bdb-491d-acfd-dfe70c09b98d',
64 'share_server_id': 'cc345a53-491d-acfd-3bdb-dfe70c09b98d',
65 'export_locations': [{'path': '172.24.44.10:/shares/'
66 'aa4a7710-f326-41fb-ad18-b4ad587fc87a'}],
67}
69share_mount_support_nfs = {
70 'id': '62125744-fcdd-4f55-a8c1-d1498102f634',
71 'name': '62125744-fcdd-4f55-a8c1-d1498102f634',
72 'size': 50,
73 'host': 'hnas',
74 'share_proto': 'NFS',
75 'share_type_id': 1,
76 'share_network_id': 'bb329e24-3bdb-491d-acfd-dfe70c09b98d',
77 'share_server_id': 'cc345a53-491d-acfd-3bdb-dfe70c09b98d',
78 'export_locations': [{'path': '172.24.44.10:/shares/'
79 '62125744-fcdd-4f55-a8c1-d1498102f634'}],
80 'mount_snapshot_support': True,
81}
83share_mount_support_cifs = {
84 'id': 'd6e7dc6b-f65f-49d9-968d-936f75474f29',
85 'name': 'd6e7dc6b-f65f-49d9-968d-936f75474f29',
86 'size': 50,
87 'host': 'hnas',
88 'share_proto': 'CIFS',
89 'share_type_id': 1,
90 'share_network_id': 'bb329e24-3bdb-491d-acfd-dfe70c09b98d',
91 'share_server_id': 'cc345a53-491d-acfd-3bdb-dfe70c09b98d',
92 'export_locations': [{'path': '172.24.44.10:/shares/'
93 'd6e7dc6b-f65f-49d9-968d-936f75474f29'}],
94 'mount_snapshot_support': True,
95}
97access_nfs_rw = {
98 'id': 'acdc7172b-fe07-46c4-b78f-df3e0324ccd0',
99 'access_type': 'ip',
100 'access_to': '172.24.44.200',
101 'access_level': 'rw',
102 'state': 'active',
103}
105access_cifs_rw = {
106 'id': '43167594-40e9-b899-1f4f-b9c2176b7564',
107 'access_type': 'user',
108 'access_to': 'fake_user',
109 'access_level': 'rw',
110 'state': 'active',
111}
113access_cifs_ro = {
114 'id': '32407088-1f4f-40e9-b899-b9a4176b574d',
115 'access_type': 'user',
116 'access_to': 'fake_user',
117 'access_level': 'ro',
118 'state': 'active',
119}
121snapshot_nfs = {
122 'id': 'abba6d9b-f29c-4bf7-aac1-618cda7aaf0f',
123 'share_id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
124 'share': share_nfs,
125 'provider_location': '/snapshots/aa4a7710-f326-41fb-ad18-b4ad587fc87a/'
126 'abba6d9b-f29c-4bf7-aac1-618cda7aaf0f',
127 'size': 2,
128}
130snapshot_cifs = {
131 'id': '91bc6e1b-1ba5-f29c-abc1-da7618cabf0a',
132 'share_id': 'f5cadaf2-afbe-4cc4-9021-85491b6b76f7',
133 'share': share_cifs,
134 'provider_location': '/snapshots/f5cadaf2-afbe-4cc4-9021-85491b6b76f7/'
135 '91bc6e1b-1ba5-f29c-abc1-da7618cabf0a',
136 'size': 2,
137}
139manage_snapshot = {
140 'id': 'bc168eb-fa71-beef-153a-3d451aa1351f',
141 'share_id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
142 'share': share_nfs,
143 'provider_location': '/snapshots/aa4a7710-f326-41fb-ad18-b4ad587fc87a'
144 '/snapshot18-05-2106',
145}
147snapshot_mount_support_nfs = {
148 'id': '3377b015-a695-4a5a-8aa5-9b931b023380',
149 'share_id': '62125744-fcdd-4f55-a8c1-d1498102f634',
150 'share': share_mount_support_nfs,
151 'provider_location': '/snapshots/62125744-fcdd-4f55-a8c1-d1498102f634'
152 '/3377b015-a695-4a5a-8aa5-9b931b023380',
153}
155snapshot_mount_support_cifs = {
156 'id': 'f9916515-5cb8-4612-afa6-7f2baa74223a',
157 'share_id': 'd6e7dc6b-f65f-49d9-968d-936f75474f29',
158 'share': share_mount_support_cifs,
159 'provider_location': '/snapshots/d6e7dc6b-f65f-49d9-968d-936f75474f29'
160 '/f9916515-5cb8-4612-afa6-7f2baa74223a',
161}
163invalid_share = {
164 'id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
165 'name': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
166 'size': 100,
167 'host': 'hnas',
168 'share_proto': 'HDFS',
169}
171invalid_snapshot = {
172 'id': '24dcdcb5-a582-4bcc-b462-641da143afee',
173 'share_id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
174 'share': invalid_share,
175}
177invalid_access_type = {
178 'id': 'acdc7172b-fe07-46c4-b78f-df3e0324ccd0',
179 'access_type': 'cert',
180 'access_to': 'manila_user',
181 'access_level': 'rw',
182 'state': 'active',
183}
185invalid_access_level = {
186 'id': 'acdc7172b-fe07-46c4-b78f-df3e0324ccd0',
187 'access_type': 'ip',
188 'access_to': 'manila_user',
189 'access_level': '777',
190 'state': 'active',
191}
193invalid_protocol_msg = ("Share backend error: Only NFS or CIFS protocol are "
194 "currently supported. Share provided %(id)s with "
195 "protocol %(proto)s." %
196 {'id': invalid_share['id'],
197 'proto': invalid_share['share_proto']})
200@ddt.ddt
201class HitachiHNASTestCase(test.TestCase):
202 def setUp(self):
203 super(HitachiHNASTestCase, self).setUp()
204 CONF.set_default('driver_handles_share_servers', False)
205 CONF.hitachi_hnas_evs_id = '2'
206 CONF.hitachi_hnas_evs_ip = '172.24.44.10'
207 CONF.hitachi_hnas_admin_network_ip = '10.20.30.40'
208 CONF.hitachi_hnas_ip = '172.24.44.1'
209 CONF.hitachi_hnas_ip_port = 'hitachi_hnas_ip_port'
210 CONF.hitachi_hnas_user = 'hitachi_hnas_user'
211 CONF.hitachi_hnas_password = 'hitachi_hnas_password'
212 CONF.hitachi_hnas_file_system_name = 'file_system'
213 CONF.hitachi_hnas_ssh_private_key = 'private_key'
214 CONF.hitachi_hnas_cluster_admin_ip0 = None
215 CONF.hitachi_hnas_stalled_job_timeout = 10
216 CONF.hitachi_hnas_driver_helper = ('manila.share.drivers.hitachi.hnas.'
217 'ssh.HNASSSHBackend')
218 self.fake_conf = manila.share.configuration.Configuration(None)
220 self.fake_private_storage = mock.Mock()
221 self.mock_object(self.fake_private_storage, 'get',
222 mock.Mock(return_value=None))
223 self.mock_object(self.fake_private_storage, 'delete',
224 mock.Mock(return_value=None))
226 self._driver = driver.HitachiHNASDriver(
227 private_storage=self.fake_private_storage,
228 configuration=self.fake_conf)
229 self._driver.backend_name = "hnas"
230 self.mock_log = self.mock_object(driver, 'LOG')
232 # mocking common backend calls
233 self.mock_object(ssh.HNASSSHBackend, "check_fs_mounted", mock.Mock(
234 return_value=True))
235 self.mock_object(ssh.HNASSSHBackend, "check_vvol")
236 self.mock_object(ssh.HNASSSHBackend, "check_quota")
237 self.mock_object(ssh.HNASSSHBackend, "check_cifs")
238 self.mock_object(ssh.HNASSSHBackend, "check_export")
239 self.mock_object(ssh.HNASSSHBackend, 'check_directory')
241 @ddt.data('hitachi_hnas_driver_helper', 'hitachi_hnas_evs_id',
242 'hitachi_hnas_evs_ip', 'hitachi_hnas_ip', 'hitachi_hnas_user')
243 def test_init_invalid_conf_parameters(self, attr_name):
244 self.mock_object(manila.share.driver.ShareDriver, '__init__')
245 setattr(CONF, attr_name, None)
247 self.assertRaises(exception.InvalidParameterValue,
248 self._driver.__init__)
250 def test_init_invalid_credentials(self):
251 self.mock_object(manila.share.driver.ShareDriver,
252 '__init__')
253 CONF.hitachi_hnas_password = None
254 CONF.hitachi_hnas_ssh_private_key = None
256 self.assertRaises(exception.InvalidParameterValue,
257 self._driver.__init__)
259 @ddt.data(True, False)
260 def test_update_access_nfs(self, empty_rules):
261 if not empty_rules:
262 access1 = {
263 'access_type': 'ip',
264 'access_to': '172.24.10.10',
265 'access_level': 'rw'
266 }
267 access2 = {
268 'access_type': 'ip',
269 'access_to': '188.100.20.10',
270 'access_level': 'ro'
271 }
272 access_list = [access1, access2]
273 access_list_updated = (
274 [access1['access_to'] + '(' + access1['access_level'] +
275 ',norootsquash)', access2['access_to'] + '(' +
276 access2['access_level'] + ')', ])
277 else:
278 access_list = []
279 access_list_updated = []
281 self.mock_object(ssh.HNASSSHBackend, "update_nfs_access_rule",
282 mock.Mock())
283 self._driver.update_access('context', share_nfs, access_list,
284 [], [], [])
286 ssh.HNASSSHBackend.update_nfs_access_rule.assert_called_once_with(
287 access_list_updated, share_id=share_nfs['id'])
288 self.assertTrue(self.mock_log.debug.called)
290 def test_update_access_ip_exception(self):
291 access1 = {
292 'access_type': 'ip',
293 'access_to': '188.100.20.10',
294 'access_level': 'ro'
295 }
296 access2 = {
297 'access_type': 'something',
298 'access_to': '172.24.10.10',
299 'access_level': 'rw'
300 }
301 access_list = [access1, access2]
303 self.assertRaises(exception.InvalidShareAccess,
304 self._driver.update_access, 'context', share_nfs,
305 access_list, [], [], [])
307 def test_update_access_not_found_exception(self):
308 access1 = {
309 'access_type': 'ip',
310 'access_to': '188.100.20.10',
311 'access_level': 'ro'
312 }
313 access2 = {
314 'access_type': 'something',
315 'access_to': '172.24.10.10',
316 'access_level': 'rw'
317 }
318 access_list = [access1, access2]
320 self.mock_object(self._driver, '_ensure_share', mock.Mock(
321 side_effect=exception.HNASItemNotFoundException(msg='fake')))
323 self.assertRaises(exception.ShareResourceNotFound,
324 self._driver.update_access, 'context', share_nfs,
325 access_list, add_rules=[], delete_rules=[],
326 update_rules=[])
328 @ddt.data([access_cifs_rw, 'acr'], [access_cifs_ro, 'ar'])
329 @ddt.unpack
330 def test_allow_access_cifs(self, access_cifs, permission):
331 access_list_allow = [access_cifs]
333 self.mock_object(ssh.HNASSSHBackend, 'cifs_allow_access')
335 self._driver.update_access('context', share_cifs, [],
336 access_list_allow, [], [])
338 ssh.HNASSSHBackend.cifs_allow_access.assert_called_once_with(
339 share_cifs['id'], 'fake_user', permission, is_snapshot=False)
340 self.assertTrue(self.mock_log.debug.called)
342 def test_allow_access_cifs_invalid_type(self):
343 access_cifs_type_ip = {
344 'id': '43167594-40e9-b899-1f4f-b9c2176b7564',
345 'access_type': 'ip',
346 'access_to': 'fake_user',
347 'access_level': 'rw',
348 'state': 'active',
349 }
350 access_list_allow = [access_cifs_type_ip]
352 self.assertRaises(exception.InvalidShareAccess,
353 self._driver.update_access, 'context', share_cifs,
354 [], access_list_allow, [], [])
356 def test_deny_access_cifs(self):
357 access_list_deny = [access_cifs_rw]
359 self.mock_object(ssh.HNASSSHBackend, 'cifs_deny_access')
361 self._driver.update_access('context', share_cifs, [], [],
362 access_list_deny, [])
364 ssh.HNASSSHBackend.cifs_deny_access.assert_called_once_with(
365 share_cifs['id'], 'fake_user', is_snapshot=False)
366 self.assertTrue(self.mock_log.debug.called)
368 def test_deny_access_cifs_unsupported_type(self):
369 access_cifs_type_ip = {
370 'id': '43167594-40e9-b899-1f4f-b9c2176b7564',
371 'access_type': 'ip',
372 'access_to': 'fake_user',
373 'access_level': 'rw',
374 'state': 'active',
375 }
376 access_list_deny = [access_cifs_type_ip]
378 self.mock_object(ssh.HNASSSHBackend, 'cifs_deny_access')
380 self._driver.update_access('context', share_cifs, [], [],
381 access_list_deny, [])
382 self.assertTrue(self.mock_log.warning.called)
384 def test_update_access_invalid_share_protocol(self):
385 self.mock_object(self._driver, '_ensure_share')
386 ex = self.assertRaises(exception.ShareBackendException,
387 self._driver.update_access, 'context',
388 invalid_share, [], [], [], [])
389 self.assertEqual(invalid_protocol_msg, ex.msg)
391 def test_update_access_cifs_recovery_mode(self):
392 access_list = [access_cifs_rw, access_cifs_ro]
393 permission_list = [('fake_user1', 'acr'), ('fake_user2', 'ar')]
395 self.mock_object(ssh.HNASSSHBackend, 'list_cifs_permissions',
396 mock.Mock(return_value=permission_list))
397 self.mock_object(ssh.HNASSSHBackend, 'cifs_deny_access')
398 self.mock_object(ssh.HNASSSHBackend, 'cifs_allow_access')
400 self._driver.update_access('context', share_cifs, access_list,
401 [], [], [])
403 ssh.HNASSSHBackend.list_cifs_permissions.assert_called_once_with(
404 share_cifs['id'])
405 self.assertTrue(self.mock_log.debug.called)
407 def _get_export(self, id, share_proto, ip, is_admin_only,
408 is_snapshot=False):
409 if share_proto.lower() == 'nfs':
410 if is_snapshot:
411 path = '/snapshots/' + id
412 else:
413 path = '/shares/' + id
414 export = ':'.join((ip, path))
415 else:
416 export = r'\\%s\%s' % (ip, id)
418 return {
419 "path": export,
420 "is_admin_only": is_admin_only,
421 "metadata": {},
422 }
424 @ddt.data(share_nfs, share_cifs)
425 def test_create_share(self, share):
426 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted",
427 mock.Mock())
428 self.mock_object(ssh.HNASSSHBackend, "vvol_create")
429 self.mock_object(ssh.HNASSSHBackend, "quota_add")
430 self.mock_object(ssh.HNASSSHBackend, "nfs_export_add", mock.Mock(
431 return_value='/shares/' + share['id']))
432 self.mock_object(ssh.HNASSSHBackend, "cifs_share_add")
434 result = self._driver.create_share('context', share)
436 self.assertTrue(self.mock_log.debug.called)
437 ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share['id'])
438 ssh.HNASSSHBackend.quota_add.assert_called_once_with(share['id'],
439 share['size'])
440 expected = [
441 self._get_export(
442 share['id'], share['share_proto'], self._driver.hnas_evs_ip,
443 False),
444 self._get_export(
445 share['id'], share['share_proto'],
446 self._driver.hnas_admin_network_ip, True)]
448 if share['share_proto'].lower() == 'nfs':
449 ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(
450 share_nfs['id'], snapshot_id=None)
451 self.assertFalse(ssh.HNASSSHBackend.cifs_share_add.called)
452 else:
453 ssh.HNASSSHBackend.cifs_share_add.assert_called_once_with(
454 share_cifs['id'], snapshot_id=None)
455 self.assertFalse(ssh.HNASSSHBackend.nfs_export_add.called)
456 self.assertEqual(expected, result)
458 def test_create_share_export_error(self):
459 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted",
460 mock.Mock())
461 self.mock_object(ssh.HNASSSHBackend, "vvol_create")
462 self.mock_object(ssh.HNASSSHBackend, "quota_add")
463 self.mock_object(ssh.HNASSSHBackend, "nfs_export_add", mock.Mock(
464 side_effect=exception.HNASBackendException('msg')))
465 self.mock_object(ssh.HNASSSHBackend, "vvol_delete")
467 self.assertRaises(exception.HNASBackendException,
468 self._driver.create_share, 'context', share_nfs)
469 self.assertTrue(self.mock_log.debug.called)
470 ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share_nfs['id'])
471 ssh.HNASSSHBackend.quota_add.assert_called_once_with(share_nfs['id'],
472 share_nfs['size'])
473 ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(
474 share_nfs['id'], snapshot_id=None)
475 ssh.HNASSSHBackend.vvol_delete.assert_called_once_with(share_nfs['id'])
477 def test_create_share_invalid_share_protocol(self):
478 self.mock_object(driver.HitachiHNASDriver, "_create_share",
479 mock.Mock(return_value="path"))
481 ex = self.assertRaises(exception.ShareBackendException,
482 self._driver.create_share, 'context',
483 invalid_share)
484 self.assertEqual(invalid_protocol_msg, ex.msg)
486 @ddt.data(share_nfs, share_cifs)
487 def test_delete_share(self, share):
488 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted",
489 mock.Mock())
490 self.mock_object(ssh.HNASSSHBackend, "nfs_export_del")
491 self.mock_object(ssh.HNASSSHBackend, "cifs_share_del")
492 self.mock_object(ssh.HNASSSHBackend, "vvol_delete")
494 self._driver.delete_share('context', share)
496 self.assertTrue(self.mock_log.debug.called)
497 ssh.HNASSSHBackend.vvol_delete.assert_called_once_with(share['id'])
499 if share['share_proto'].lower() == 'nfs':
500 ssh.HNASSSHBackend.nfs_export_del.assert_called_once_with(
501 share['id'])
502 self.assertFalse(ssh.HNASSSHBackend.cifs_share_del.called)
503 else:
504 ssh.HNASSSHBackend.cifs_share_del.assert_called_once_with(
505 share['id'])
506 self.assertFalse(ssh.HNASSSHBackend.nfs_export_del.called)
508 @ddt.data(snapshot_nfs, snapshot_cifs, snapshot_mount_support_nfs,
509 snapshot_mount_support_cifs)
510 def test_create_snapshot(self, snapshot):
511 hnas_id = snapshot['share_id']
512 access_list = ['172.24.44.200(rw,norootsquash)',
513 '172.24.49.180(all_squash,read_write,secure)',
514 '172.24.49.110(ro, secure)',
515 '172.24.49.112(secure,readwrite,norootsquash)',
516 '172.24.49.142(read_only, secure)',
517 '172.24.49.201(rw,read_write,readwrite)',
518 '172.24.49.218(rw)']
520 ro_list = ['172.24.44.200(ro,norootsquash)',
521 '172.24.49.180(all_squash,ro,secure)',
522 '172.24.49.110(ro, secure)',
523 '172.24.49.112(secure,ro,norootsquash)',
524 '172.24.49.142(read_only, secure)',
525 '172.24.49.201(ro,ro,ro)',
526 '172.24.49.218(ro)']
528 export_locations = [
529 self._get_export(
530 snapshot['id'], snapshot['share']['share_proto'],
531 self._driver.hnas_evs_ip, False, is_snapshot=True),
532 self._get_export(
533 snapshot['id'], snapshot['share']['share_proto'],
534 self._driver.hnas_admin_network_ip, True, is_snapshot=True)]
536 expected = {'provider_location': '/snapshots/' + hnas_id + '/' +
537 snapshot['id']}
539 if snapshot['share'].get('mount_snapshot_support'):
540 expected['export_locations'] = export_locations
542 self.mock_object(ssh.HNASSSHBackend, "get_nfs_host_list", mock.Mock(
543 return_value=access_list))
544 self.mock_object(ssh.HNASSSHBackend, "update_nfs_access_rule",
545 mock.Mock())
546 self.mock_object(ssh.HNASSSHBackend, "is_cifs_in_use", mock.Mock(
547 return_value=False))
548 self.mock_object(ssh.HNASSSHBackend, "tree_clone")
549 self.mock_object(ssh.HNASSSHBackend, "nfs_export_add")
550 self.mock_object(ssh.HNASSSHBackend, "cifs_share_add")
552 out = self._driver.create_snapshot('context', snapshot)
554 ssh.HNASSSHBackend.tree_clone.assert_called_once_with(
555 '/shares/' + hnas_id, '/snapshots/' + hnas_id + '/' +
556 snapshot['id'])
557 self.assertEqual(expected, out)
559 if snapshot['share']['share_proto'].lower() == 'nfs':
560 ssh.HNASSSHBackend.get_nfs_host_list.assert_called_once_with(
561 hnas_id)
562 ssh.HNASSSHBackend.update_nfs_access_rule.assert_any_call(
563 ro_list, share_id=hnas_id)
564 ssh.HNASSSHBackend.update_nfs_access_rule.assert_any_call(
565 access_list, share_id=hnas_id)
566 else:
567 ssh.HNASSSHBackend.is_cifs_in_use.assert_called_once_with(
568 hnas_id)
570 def test_create_snapshot_invalid_protocol(self):
571 self.mock_object(self._driver, '_ensure_share')
572 ex = self.assertRaises(exception.ShareBackendException,
573 self._driver.create_snapshot, 'context',
574 invalid_snapshot)
575 self.assertEqual(invalid_protocol_msg, ex.msg)
577 def test_create_snapshot_cifs_exception(self):
578 cifs_excep_msg = ("Share backend error: CIFS snapshot when share is "
579 "mounted is disabled. Set "
580 "hitachi_hnas_allow_cifs_snapshot_while_mounted to "
581 "True or unmount the share to take a snapshot.")
583 self.mock_object(ssh.HNASSSHBackend, "is_cifs_in_use", mock.Mock(
584 return_value=True))
586 ex = self.assertRaises(exception.ShareBackendException,
587 self._driver.create_snapshot, 'context',
588 snapshot_cifs)
589 self.assertEqual(cifs_excep_msg, ex.msg)
591 def test_create_snapshot_first_snapshot(self):
592 hnas_id = snapshot_nfs['share_id']
593 self.mock_object(ssh.HNASSSHBackend, "get_nfs_host_list", mock.Mock(
594 return_value=['172.24.44.200(rw)']))
595 self.mock_object(ssh.HNASSSHBackend, "update_nfs_access_rule",
596 mock.Mock())
597 self.mock_object(ssh.HNASSSHBackend, "tree_clone", mock.Mock(
598 side_effect=exception.HNASNothingToCloneException('msg')))
599 self.mock_object(ssh.HNASSSHBackend, "create_directory")
600 self.mock_object(ssh.HNASSSHBackend, "nfs_export_add")
601 self.mock_object(ssh.HNASSSHBackend, "cifs_share_add")
603 self._driver.create_snapshot('context', snapshot_nfs)
605 self.assertTrue(self.mock_log.warning.called)
606 ssh.HNASSSHBackend.get_nfs_host_list.assert_called_once_with(
607 hnas_id)
608 ssh.HNASSSHBackend.update_nfs_access_rule.assert_any_call(
609 ['172.24.44.200(ro)'], share_id=hnas_id)
610 ssh.HNASSSHBackend.update_nfs_access_rule.assert_any_call(
611 ['172.24.44.200(rw)'], share_id=hnas_id)
612 ssh.HNASSSHBackend.create_directory.assert_called_once_with(
613 '/snapshots/' + hnas_id + '/' + snapshot_nfs['id'])
615 @ddt.data(snapshot_nfs, snapshot_cifs,
616 snapshot_mount_support_nfs, snapshot_mount_support_cifs)
617 def test_delete_snapshot(self, snapshot):
618 hnas_share_id = snapshot['share_id']
619 hnas_snapshot_id = snapshot['id']
620 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted")
621 self.mock_object(ssh.HNASSSHBackend, "tree_delete")
622 self.mock_object(ssh.HNASSSHBackend, "delete_directory")
623 self.mock_object(ssh.HNASSSHBackend, "nfs_export_del")
624 self.mock_object(ssh.HNASSSHBackend, "cifs_share_del")
626 self._driver.delete_snapshot('context', snapshot)
628 self.assertTrue(self.mock_log.debug.called)
629 self.assertTrue(self.mock_log.info.called)
630 driver.HitachiHNASDriver._check_fs_mounted.assert_called_once_with()
631 ssh.HNASSSHBackend.tree_delete.assert_called_once_with(
632 '/snapshots/' + hnas_share_id + '/' + snapshot['id'])
633 ssh.HNASSSHBackend.delete_directory.assert_called_once_with(
634 '/snapshots/' + hnas_share_id)
635 if snapshot['share']['share_proto'].lower() == 'nfs':
636 if snapshot['share'].get('mount_snapshot_support'):
637 ssh.HNASSSHBackend.nfs_export_del.assert_called_once_with(
638 snapshot_id=hnas_snapshot_id)
639 else:
640 ssh.HNASSSHBackend.nfs_export_del.assert_not_called()
641 else:
642 if snapshot['share'].get('mount_snapshot_support'):
643 ssh.HNASSSHBackend.cifs_share_del.assert_called_once_with(
644 hnas_snapshot_id)
645 else:
646 ssh.HNASSSHBackend.cifs_share_del.assert_not_called()
648 def test_delete_managed_snapshot(self):
649 hnas_id = manage_snapshot['share_id']
650 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted")
651 self.mock_object(ssh.HNASSSHBackend, "tree_delete")
652 self.mock_object(ssh.HNASSSHBackend, "delete_directory")
653 self.mock_object(ssh.HNASSSHBackend, "nfs_export_del")
654 self.mock_object(ssh.HNASSSHBackend, "cifs_share_del")
656 self._driver.delete_snapshot('context', manage_snapshot)
658 self.assertTrue(self.mock_log.debug.called)
659 self.assertTrue(self.mock_log.info.called)
660 driver.HitachiHNASDriver._check_fs_mounted.assert_called_once_with()
661 ssh.HNASSSHBackend.tree_delete.assert_called_once_with(
662 manage_snapshot['provider_location'])
663 ssh.HNASSSHBackend.delete_directory.assert_called_once_with(
664 '/snapshots/' + hnas_id)
666 @ddt.data(share_nfs, share_cifs)
667 def test_ensure_share(self, share):
668 result = self._driver.ensure_share('context', share)
670 ssh.HNASSSHBackend.check_vvol.assert_called_once_with(share['id'])
671 ssh.HNASSSHBackend.check_quota.assert_called_once_with(share['id'])
673 expected = [
674 self._get_export(
675 share['id'], share['share_proto'], self._driver.hnas_evs_ip,
676 False),
677 self._get_export(
678 share['id'], share['share_proto'],
679 self._driver.hnas_admin_network_ip, True)]
681 if share['share_proto'].lower() == 'nfs':
682 ssh.HNASSSHBackend.check_export.assert_called_once_with(
683 share['id'])
684 self.assertFalse(ssh.HNASSSHBackend.check_cifs.called)
685 else:
686 ssh.HNASSSHBackend.check_cifs.assert_called_once_with(share['id'])
687 self.assertFalse(ssh.HNASSSHBackend.check_export.called)
688 self.assertEqual(expected, result)
690 def test_ensure_share_invalid_protocol(self):
691 ex = self.assertRaises(exception.ShareBackendException,
692 self._driver.ensure_share, 'context',
693 invalid_share)
695 self.assertEqual(invalid_protocol_msg, ex.msg)
697 def test_shrink_share(self):
698 self.mock_object(ssh.HNASSSHBackend, "get_share_usage", mock.Mock(
699 return_value=10))
700 self.mock_object(ssh.HNASSSHBackend, "modify_quota")
702 self._driver.shrink_share(share_nfs, 11)
704 ssh.HNASSSHBackend.get_share_usage.assert_called_once_with(
705 share_nfs['id'])
706 ssh.HNASSSHBackend.modify_quota.assert_called_once_with(
707 share_nfs['id'], 11)
709 def test_shrink_share_new_size_lower_than_usage(self):
710 self.mock_object(ssh.HNASSSHBackend, "get_share_usage", mock.Mock(
711 return_value=10))
713 self.assertRaises(exception.ShareShrinkingPossibleDataLoss,
714 self._driver.shrink_share, share_nfs, 9)
715 ssh.HNASSSHBackend.get_share_usage.assert_called_once_with(
716 share_nfs['id'])
718 def test_extend_share(self):
719 self.mock_object(ssh.HNASSSHBackend, "get_stats", mock.Mock(
720 return_value=(500, 200, True)))
721 self.mock_object(ssh.HNASSSHBackend, "modify_quota")
723 self._driver.extend_share(share_nfs, 150)
725 ssh.HNASSSHBackend.get_stats.assert_called_once_with()
726 ssh.HNASSSHBackend.modify_quota.assert_called_once_with(
727 share_nfs['id'], 150)
729 def test_extend_share_with_no_available_space_in_fs(self):
730 self.mock_object(ssh.HNASSSHBackend, "get_stats", mock.Mock(
731 return_value=(500, 200, False)))
732 self.mock_object(ssh.HNASSSHBackend, "modify_quota")
734 self.assertRaises(exception.HNASBackendException,
735 self._driver.extend_share, share_nfs, 1000)
736 ssh.HNASSSHBackend.get_stats.assert_called_once_with()
738 @ddt.data(share_nfs, share_cifs)
739 def test_manage_existing(self, share):
741 expected_exports = [
742 self._get_export(
743 share['id'], share['share_proto'], self._driver.hnas_evs_ip,
744 False),
745 self._get_export(
746 share['id'], share['share_proto'],
747 self._driver.hnas_admin_network_ip, True)]
749 expected_out = {'size': share['size'],
750 'export_locations': expected_exports}
752 self.mock_object(ssh.HNASSSHBackend, "get_share_quota", mock.Mock(
753 return_value=share['size']))
755 out = self._driver.manage_existing(share, 'option')
757 self.assertEqual(expected_out, out)
758 ssh.HNASSSHBackend.get_share_quota.assert_called_once_with(
759 share['id'])
761 def test_manage_existing_no_quota(self):
762 self.mock_object(ssh.HNASSSHBackend, "get_share_quota", mock.Mock(
763 return_value=None))
765 self.assertRaises(exception.ManageInvalidShare,
766 self._driver.manage_existing, share_nfs, 'option')
767 ssh.HNASSSHBackend.get_share_quota.assert_called_once_with(
768 share_nfs['id'])
770 def test_manage_existing_wrong_share_id(self):
771 self.mock_object(self.fake_private_storage, 'get',
772 mock.Mock(return_value='Wrong_share_id'))
774 self.assertRaises(exception.HNASBackendException,
775 self._driver.manage_existing, share_nfs, 'option')
777 @ddt.data(':/', '1.1.1.1:/share_id', '1.1.1.1:/shares',
778 '1.1.1.1:shares/share_id', ':/share_id')
779 def test_manage_existing_wrong_path_format_nfs(self, wrong_location):
780 expected_exception = ("Share backend error: Incorrect path. It "
781 "should have the following format: "
782 "IP:/shares/share_id.")
784 self._test_manage_existing_wrong_path(
785 share_nfs.copy(), expected_exception, wrong_location)
787 @ddt.data('\\\\1.1.1.1', '1.1.1.1\\share_id', '1.1.1.1\\shares\\share_id',
788 '\\\\1.1.1.1\\shares\\share_id', '\\\\share_id')
789 def test_manage_existing_wrong_path_format_cifs(self, wrong_location):
790 expected_exception = ("Share backend error: Incorrect path. It should "
791 "have the following format: \\\\IP\\share_id.")
793 self._test_manage_existing_wrong_path(
794 share_cifs.copy(), expected_exception, wrong_location)
796 def _test_manage_existing_wrong_path(
797 self, share, expected_exception, wrong_location):
799 share['export_locations'] = [{'path': wrong_location}]
801 ex = self.assertRaises(exception.ShareBackendException,
802 self._driver.manage_existing, share, 'option')
804 self.assertEqual(expected_exception, ex.msg)
806 def test_manage_existing_wrong_evs_ip(self):
807 share_nfs['export_locations'] = [{'path': '172.24.44.189:/shares/'
808 'aa4a7710-f326-41fb-ad18-'}]
810 self.assertRaises(exception.ShareBackendException,
811 self._driver.manage_existing, share_nfs,
812 'option')
814 def test_manage_existing_invalid_host(self):
815 self.assertRaises(exception.ShareBackendException,
816 self._driver.manage_existing, share_invalid_host,
817 'option')
819 def test_manage_existing_invalid_protocol(self):
820 self.assertRaises(exception.ShareBackendException,
821 self._driver.manage_existing, invalid_share,
822 'option')
824 @ddt.data(True, False)
825 def test_unmanage(self, has_export_locations):
826 share_copy = share_nfs.copy()
827 if not has_export_locations:
828 share_copy['export_locations'] = []
830 self._driver.unmanage(share_copy)
832 self.assertTrue(self.fake_private_storage.delete.called)
833 self.assertTrue(self.mock_log.info.called)
835 def test_get_network_allocations_number(self):
836 result = self._driver.get_network_allocations_number()
838 self.assertEqual(0, result)
840 @ddt.data([share_nfs, snapshot_nfs], [share_cifs, snapshot_cifs])
841 @ddt.unpack
842 def test_create_share_from_snapshot(self, share, snapshot):
843 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted",
844 mock.Mock())
845 self.mock_object(ssh.HNASSSHBackend, "vvol_create")
846 self.mock_object(ssh.HNASSSHBackend, "quota_add")
847 self.mock_object(ssh.HNASSSHBackend, "tree_clone")
848 self.mock_object(ssh.HNASSSHBackend, "cifs_share_add")
849 self.mock_object(ssh.HNASSSHBackend, "nfs_export_add")
851 result = self._driver.create_share_from_snapshot('context',
852 share,
853 snapshot)
855 ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share['id'])
856 ssh.HNASSSHBackend.quota_add.assert_called_once_with(share['id'],
857 share['size'])
858 ssh.HNASSSHBackend.tree_clone.assert_called_once_with(
859 '/snapshots/' + share['id'] + '/' + snapshot['id'],
860 '/shares/' + share['id'])
862 expected = [
863 self._get_export(
864 share['id'], share['share_proto'], self._driver.hnas_evs_ip,
865 False),
866 self._get_export(
867 share['id'], share['share_proto'],
868 self._driver.hnas_admin_network_ip, True)]
870 if share['share_proto'].lower() == 'nfs':
871 ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(
872 share['id'])
873 self.assertFalse(ssh.HNASSSHBackend.cifs_share_add.called)
874 else:
875 ssh.HNASSSHBackend.cifs_share_add.assert_called_once_with(
876 share['id'])
877 self.assertFalse(ssh.HNASSSHBackend.nfs_export_add.called)
879 self.assertEqual(expected, result)
881 def test_create_share_from_snapshot_empty_snapshot(self):
882 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted",
883 mock.Mock())
884 self.mock_object(ssh.HNASSSHBackend, "vvol_create")
885 self.mock_object(ssh.HNASSSHBackend, "quota_add")
886 self.mock_object(ssh.HNASSSHBackend, "tree_clone", mock.Mock(
887 side_effect=exception.HNASNothingToCloneException('msg')))
888 self.mock_object(ssh.HNASSSHBackend, "nfs_export_add")
890 result = self._driver.create_share_from_snapshot('context', share_nfs,
891 snapshot_nfs)
892 expected = [
893 self._get_export(
894 share_nfs['id'], share_nfs['share_proto'],
895 self._driver.hnas_evs_ip, False),
896 self._get_export(
897 share_nfs['id'], share_nfs['share_proto'],
898 self._driver.hnas_admin_network_ip, True)]
900 self.assertEqual(expected, result)
901 self.assertTrue(self.mock_log.warning.called)
902 ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share_nfs['id'])
903 ssh.HNASSSHBackend.quota_add.assert_called_once_with(share_nfs['id'],
904 share_nfs['size'])
905 ssh.HNASSSHBackend.tree_clone.assert_called_once_with(
906 '/snapshots/' + share_nfs['id'] + '/' + snapshot_nfs['id'],
907 '/shares/' + share_nfs['id'])
908 ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(
909 share_nfs['id'])
911 def test_create_share_from_snapshot_invalid_protocol(self):
912 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted",
913 mock.Mock())
914 self.mock_object(ssh.HNASSSHBackend, "vvol_create")
915 self.mock_object(ssh.HNASSSHBackend, "quota_add")
916 self.mock_object(ssh.HNASSSHBackend, "tree_clone")
918 ex = self.assertRaises(exception.ShareBackendException,
919 self._driver.create_share_from_snapshot,
920 'context', invalid_share, snapshot_nfs)
921 self.assertEqual(invalid_protocol_msg, ex.msg)
923 def test_create_share_from_snapshot_cleanup(self):
924 dest_path = '/snapshots/' + share_nfs['id'] + '/' + snapshot_nfs['id']
925 src_path = '/shares/' + share_nfs['id']
927 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted",
928 mock.Mock())
929 self.mock_object(ssh.HNASSSHBackend, "vvol_create")
930 self.mock_object(ssh.HNASSSHBackend, "quota_add")
931 self.mock_object(ssh.HNASSSHBackend, "tree_clone")
932 self.mock_object(ssh.HNASSSHBackend, "vvol_delete")
933 self.mock_object(ssh.HNASSSHBackend, "nfs_export_add", mock.Mock(
934 side_effect=exception.HNASBackendException(
935 msg='Error adding nfs export.')))
937 self.assertRaises(exception.HNASBackendException,
938 self._driver.create_share_from_snapshot,
939 'context', share_nfs, snapshot_nfs)
941 ssh.HNASSSHBackend.vvol_create.assert_called_once_with(
942 share_nfs['id'])
943 ssh.HNASSSHBackend.quota_add.assert_called_once_with(
944 share_nfs['id'], share_nfs['size'])
945 ssh.HNASSSHBackend.tree_clone.assert_called_once_with(
946 dest_path, src_path)
947 ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(
948 share_nfs['id'])
949 ssh.HNASSSHBackend.vvol_delete.assert_called_once_with(
950 share_nfs['id'])
952 def test__check_fs_mounted(self):
953 self._driver._check_fs_mounted()
955 ssh.HNASSSHBackend.check_fs_mounted.assert_called_once_with()
957 def test__check_fs_mounted_not_mounted(self):
958 self.mock_object(ssh.HNASSSHBackend, 'check_fs_mounted', mock.Mock(
959 return_value=False))
961 self.assertRaises(exception.HNASBackendException,
962 self._driver._check_fs_mounted)
964 ssh.HNASSSHBackend.check_fs_mounted.assert_called_once_with()
966 def test__update_share_stats(self):
967 fake_data = {
968 'share_backend_name': self._driver.backend_name,
969 'driver_handles_share_servers':
970 self._driver.driver_handles_share_servers,
971 'vendor_name': 'Hitachi',
972 'driver_version': '4.0.0',
973 'storage_protocol': 'NFS_CIFS',
974 'total_capacity_gb': 1000,
975 'free_capacity_gb': 200,
976 'reserved_percentage': driver.CONF.reserved_share_percentage,
977 'reserved_snapshot_percentage':
978 driver.CONF.reserved_share_from_snapshot_percentage,
979 'reserved_share_extend_percentage':
980 driver.CONF.reserved_share_extend_percentage,
981 'qos': False,
982 'thin_provisioning': True,
983 'dedupe': True,
984 'revert_to_snapshot_support': True,
985 'mount_snapshot_support': True,
986 }
988 self.mock_object(ssh.HNASSSHBackend, 'get_stats', mock.Mock(
989 return_value=(1000, 200, True)))
990 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted",
991 mock.Mock())
992 self.mock_object(manila.share.driver.ShareDriver,
993 '_update_share_stats')
995 self._driver._update_share_stats()
997 self.assertTrue(self._driver.hnas.get_stats.called)
998 (manila.share.driver.ShareDriver._update_share_stats.
999 assert_called_once_with(fake_data))
1000 self.assertTrue(self.mock_log.info.called)
1002 @ddt.data(snapshot_nfs, snapshot_cifs,
1003 snapshot_mount_support_nfs, snapshot_mount_support_cifs)
1004 def test_ensure_snapshot(self, snapshot):
1005 result = self._driver.ensure_snapshot('context', snapshot)
1007 if snapshot['share'].get('mount_snapshot_support'):
1008 expected = [
1009 self._get_export(
1010 snapshot['id'], snapshot['share']['share_proto'],
1011 self._driver.hnas_evs_ip, False, is_snapshot=True),
1012 self._get_export(
1013 snapshot['id'], snapshot['share']['share_proto'],
1014 self._driver.hnas_admin_network_ip, True,
1015 is_snapshot=True)]
1017 if snapshot['share']['share_proto'].lower() == 'nfs':
1018 ssh.HNASSSHBackend.check_export.assert_called_once_with(
1019 snapshot['id'], is_snapshot=True)
1020 self.assertFalse(ssh.HNASSSHBackend.check_cifs.called)
1021 else:
1022 ssh.HNASSSHBackend.check_cifs.assert_called_once_with(
1023 snapshot['id'])
1024 self.assertFalse(ssh.HNASSSHBackend.check_export.called)
1025 else:
1026 expected = None
1028 ssh.HNASSSHBackend.check_directory.assert_called_once_with(
1029 snapshot['provider_location'])
1030 self.assertEqual(expected, result)
1032 def test_manage_existing_snapshot(self):
1033 self.mock_object(ssh.HNASSSHBackend, 'check_directory',
1034 mock.Mock(return_value=True))
1035 self.mock_object(self._driver, '_ensure_snapshot',
1036 mock.Mock(return_value=[]))
1038 path_info = manage_snapshot['provider_location'].split('/')
1039 hnas_snapshot_id = path_info[3]
1041 out = self._driver.manage_existing_snapshot(manage_snapshot,
1042 {'size': 20})
1044 ssh.HNASSSHBackend.check_directory.assert_called_with(
1045 '/snapshots/aa4a7710-f326-41fb-ad18-b4ad587fc87a'
1046 '/snapshot18-05-2106')
1047 self._driver._ensure_snapshot.assert_called_with(
1048 manage_snapshot,
1049 hnas_snapshot_id)
1050 self.assertEqual(20, out['size'])
1051 self.assertTrue(self.mock_log.debug.called)
1052 self.assertTrue(self.mock_log.info.called)
1054 @ddt.data(None, exception.HNASItemNotFoundException('Fake error.'))
1055 def test_manage_existing_snapshot_with_mount_support(self, exc):
1056 export_locations = [{
1057 'path': '172.24.44.10:/snapshots/'
1058 '3377b015-a695-4a5a-8aa5-9b931b023380'}]
1060 self.mock_object(ssh.HNASSSHBackend, 'check_directory',
1061 mock.Mock(return_value=True))
1062 self.mock_object(self._driver, '_ensure_snapshot',
1063 mock.Mock(return_value=[], side_effect=exc))
1064 self.mock_object(self._driver, '_get_export_locations',
1065 mock.Mock(return_value=export_locations))
1066 if exc:
1067 self.mock_object(self._driver, '_create_export')
1069 path_info = snapshot_mount_support_nfs['provider_location'].split('/')
1070 hnas_snapshot_id = path_info[3]
1072 out = self._driver.manage_existing_snapshot(
1073 snapshot_mount_support_nfs,
1074 {'size': 20, 'export_locations': export_locations})
1076 ssh.HNASSSHBackend.check_directory.assert_called_with(
1077 '/snapshots/62125744-fcdd-4f55-a8c1-d1498102f634'
1078 '/3377b015-a695-4a5a-8aa5-9b931b023380')
1079 self._driver._ensure_snapshot.assert_called_with(
1080 snapshot_mount_support_nfs,
1081 hnas_snapshot_id)
1082 self._driver._get_export_locations.assert_called_with(
1083 snapshot_mount_support_nfs['share']['share_proto'],
1084 hnas_snapshot_id,
1085 is_snapshot=True)
1086 if exc:
1087 self._driver._create_export.assert_called_with(
1088 snapshot_mount_support_nfs['share_id'],
1089 snapshot_mount_support_nfs['share']['share_proto'],
1090 snapshot_id=hnas_snapshot_id)
1091 self.assertEqual(20, out['size'])
1092 self.assertEqual(export_locations, out['export_locations'])
1093 self.assertTrue(self.mock_log.debug.called)
1094 self.assertTrue(self.mock_log.info.called)
1096 @ddt.data('fake_size', '128GB', '512 GB', {'size': 128})
1097 def test_manage_snapshot_invalid_size_exception(self, size):
1098 self.assertRaises(exception.ManageInvalidShareSnapshot,
1099 self._driver.manage_existing_snapshot,
1100 manage_snapshot, {'size': size})
1102 def test_manage_snapshot_size_not_provided_exception(self):
1103 self.assertRaises(exception.ManageInvalidShareSnapshot,
1104 self._driver.manage_existing_snapshot,
1105 manage_snapshot, {})
1107 @ddt.data('/root/snapshot_id', '/snapshots/share1/snapshot_id',
1108 '/directory1', 'snapshots/share1/snapshot_id')
1109 def test_manage_snapshot_invalid_path_exception(self, path):
1110 snap_copy = manage_snapshot.copy()
1111 snap_copy['provider_location'] = path
1113 self.assertRaises(exception.ManageInvalidShareSnapshot,
1114 self._driver.manage_existing_snapshot,
1115 snap_copy, {'size': 20})
1116 self.assertTrue(self.mock_log.debug.called)
1118 def test_manage_inexistent_snapshot_exception(self):
1119 self.mock_object(ssh.HNASSSHBackend, 'check_directory',
1120 mock.Mock(return_value=False))
1122 self.assertRaises(exception.ManageInvalidShareSnapshot,
1123 self._driver.manage_existing_snapshot,
1124 manage_snapshot, {'size': 20})
1125 self.assertTrue(self.mock_log.debug.called)
1127 def test_unmanage_snapshot(self):
1128 self._driver.unmanage_snapshot(snapshot_nfs)
1129 self.assertTrue(self.mock_log.info.called)
1131 @ddt.data({'snap': snapshot_nfs, 'exc': None},
1132 {'snap': snapshot_cifs, 'exc': None},
1133 {'snap': snapshot_nfs,
1134 'exc': exception.HNASNothingToCloneException('fake')},
1135 {'snap': snapshot_cifs,
1136 'exc': exception.HNASNothingToCloneException('fake')})
1137 @ddt.unpack
1138 def test_revert_to_snapshot(self, exc, snap):
1139 self.mock_object(driver.HitachiHNASDriver, "_check_fs_mounted")
1140 self.mock_object(ssh.HNASSSHBackend, 'tree_delete')
1141 self.mock_object(ssh.HNASSSHBackend, 'vvol_create')
1142 self.mock_object(ssh.HNASSSHBackend, 'quota_add')
1143 self.mock_object(ssh.HNASSSHBackend, 'tree_clone',
1144 mock.Mock(side_effect=exc))
1146 self._driver.revert_to_snapshot('context', snap, None, None)
1148 driver.HitachiHNASDriver._check_fs_mounted.assert_called_once_with()
1149 ssh.HNASSSHBackend.tree_delete.assert_called_once_with(
1150 '/'.join(('/shares', snap['share_id'])))
1151 ssh.HNASSSHBackend.vvol_create.assert_called_once_with(
1152 snap['share_id'])
1153 ssh.HNASSSHBackend.quota_add.assert_called_once_with(
1154 snap['share_id'], 2)
1155 ssh.HNASSSHBackend.tree_clone.assert_called_once_with(
1156 '/'.join(('/snapshots', snap['share_id'], snap['id'])),
1157 '/'.join(('/shares', snap['share_id'])))
1158 ssh.HNASSSHBackend.check_directory.assert_called_once_with(
1159 snap['provider_location'])
1161 if exc:
1162 self.assertTrue(self.mock_log.warning.called)
1163 self.assertTrue(self.mock_log.info.called)
1165 def test_nfs_snapshot_update_access_allow(self):
1166 access1 = {
1167 'access_type': 'ip',
1168 'access_to': '172.24.10.10',
1169 }
1170 access2 = {
1171 'access_type': 'ip',
1172 'access_to': '172.31.20.20',
1173 }
1174 access_list = [access1, access2]
1176 self.mock_object(ssh.HNASSSHBackend, "update_nfs_access_rule")
1178 self._driver.snapshot_update_access('ctxt', snapshot_nfs, access_list,
1179 access_list, [])
1181 ssh.HNASSSHBackend.update_nfs_access_rule.assert_called_once_with(
1182 [access1['access_to'] + '(ro)', access2['access_to'] + '(ro)'],
1183 snapshot_id=snapshot_nfs['id'])
1184 ssh.HNASSSHBackend.check_directory.assert_called_once_with(
1185 snapshot_nfs['provider_location'])
1186 self.assertTrue(self.mock_log.debug.called)
1188 def test_nfs_snapshot_update_access_deny(self):
1189 access1 = {
1190 'access_type': 'ip',
1191 'access_to': '172.24.10.10',
1192 }
1194 self.mock_object(ssh.HNASSSHBackend, "update_nfs_access_rule")
1196 self._driver.snapshot_update_access('ctxt', snapshot_nfs, [],
1197 [], [access1])
1199 ssh.HNASSSHBackend.update_nfs_access_rule.assert_called_once_with(
1200 [], snapshot_id=snapshot_nfs['id'])
1201 ssh.HNASSSHBackend.check_directory.assert_called_once_with(
1202 snapshot_nfs['provider_location'])
1203 self.assertTrue(self.mock_log.debug.called)
1205 def test_nfs_snapshot_update_access_invalid_access_type(self):
1206 access1 = {
1207 'access_type': 'user',
1208 'access_to': 'user1',
1209 }
1211 self.assertRaises(exception.InvalidSnapshotAccess,
1212 self._driver.snapshot_update_access, 'ctxt',
1213 snapshot_nfs, [access1], [], [])
1214 ssh.HNASSSHBackend.check_directory.assert_called_once_with(
1215 snapshot_nfs['provider_location'])
1217 def test_cifs_snapshot_update_access_allow(self):
1218 access1 = {
1219 'access_type': 'user',
1220 'access_to': 'fake_user1',
1221 }
1223 self.mock_object(ssh.HNASSSHBackend, 'cifs_allow_access')
1225 self._driver.snapshot_update_access('ctxt', snapshot_cifs, [access1],
1226 [access1], [])
1228 ssh.HNASSSHBackend.cifs_allow_access.assert_called_with(
1229 snapshot_cifs['id'], access1['access_to'], 'ar', is_snapshot=True)
1230 ssh.HNASSSHBackend.check_directory.assert_called_once_with(
1231 snapshot_cifs['provider_location'])
1232 self.assertTrue(self.mock_log.debug.called)
1234 def test_cifs_snapshot_update_access_deny(self):
1235 access1 = {
1236 'access_type': 'user',
1237 'access_to': 'fake_user1',
1238 }
1240 self.mock_object(ssh.HNASSSHBackend, 'cifs_deny_access')
1242 self._driver.snapshot_update_access('ctxt', snapshot_cifs, [], [],
1243 [access1])
1245 ssh.HNASSSHBackend.cifs_deny_access.assert_called_with(
1246 snapshot_cifs['id'], access1['access_to'], is_snapshot=True)
1247 ssh.HNASSSHBackend.check_directory.assert_called_once_with(
1248 snapshot_cifs['provider_location'])
1249 self.assertTrue(self.mock_log.debug.called)
1251 def test_cifs_snapshot_update_access_recovery_mode(self):
1252 access1 = {
1253 'access_type': 'user',
1254 'access_to': 'fake_user1',
1255 }
1256 access2 = {
1257 'access_type': 'user',
1258 'access_to': 'HDS\\fake_user2',
1259 }
1260 access_list = [access1, access2]
1261 permission_list = [('fake_user1', 'ar'), ('HDS\\fake_user2', 'ar')]
1262 formatted_user = r'"\{1}{0}\{1}"'.format(access2['access_to'], '"')
1264 self.mock_object(ssh.HNASSSHBackend, 'list_cifs_permissions',
1265 mock.Mock(return_value=permission_list))
1266 self.mock_object(ssh.HNASSSHBackend, 'cifs_deny_access')
1267 self.mock_object(ssh.HNASSSHBackend, 'cifs_allow_access')
1269 self._driver.snapshot_update_access('ctxt', snapshot_cifs, access_list,
1270 [], [])
1272 ssh.HNASSSHBackend.list_cifs_permissions.assert_called_once_with(
1273 snapshot_cifs['id'])
1274 ssh.HNASSSHBackend.cifs_deny_access.assert_called_with(
1275 snapshot_cifs['id'], formatted_user, is_snapshot=True)
1276 ssh.HNASSSHBackend.cifs_allow_access.assert_called_with(
1277 snapshot_cifs['id'], access2['access_to'].replace('\\', '\\\\'),
1278 'ar', is_snapshot=True)
1279 ssh.HNASSSHBackend.check_directory.assert_called_once_with(
1280 snapshot_cifs['provider_location'])
1281 self.assertTrue(self.mock_log.debug.called)