Coverage for manila/tests/share/drivers/dell_emc/plugins/powerstore/test_connection.py: 100%
342 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2023 Dell Inc. or its subsidiaries.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
17from unittest import mock
19from oslo_log import log
20from oslo_utils import units
22from manila.common import constants as const
23from manila import exception
24from manila.share.drivers.dell_emc.plugins.powerstore import connection
25from manila import test
27LOG = log.getLogger(__name__)
30class PowerStoreTest(test.TestCase):
31 """Unit test for the PowerStore Manila driver."""
33 REST_IP = "192.168.0.110"
34 NAS_SERVER_NAME = "powerstore-nasserver"
35 NAS_SERVER_ID = "6423d56e-eaf3-7424-be0b-1a9efb93188b"
36 NAS_SERVER_IP = "192.168.11.23"
37 SHARE_NAME = "powerstore-share"
38 SHARE_SIZE_GB = 3
39 SHARE_NEW_SIZE_GB = 6
40 FILESYSTEM_ID = "6454e9a9-a698-e9bc-ca61-1a9efb93188b"
41 NFS_EXPORT_ID = "6454ec18-7b8d-1532-1b8a-1a9efb93188b"
42 SMB_SHARE_ID = "64927ae9-3403-6930-a784-f227b9987c54"
43 RW_HOSTS = "192.168.1.10"
44 RO_HOSTS = "192.168.1.11"
45 RW_USERS = "user_1"
46 RO_USERS = "user_2"
47 SNAPSHOT_NAME = "powerstore-share-snap"
48 SNAPSHOT_ID = "6454ea29-09c3-030e-cfc3-1a9efb93188b"
49 CLONE_ID = "64560f05-e677-ec2a-7fcf-1a9efb93188b"
50 CLONE_NAME = "powerstore-nfs-share-snap-clone"
52 class MockConfig(object):
53 def safe_get(self, value):
54 if value == "dell_nas_backend_host":
55 return "192.168.0.110"
56 elif value == "dell_nas_login":
57 return "admin"
58 elif value == "dell_nas_password":
59 return "pwd"
60 elif value == "dell_nas_server":
61 return "powerstore-nasserver"
62 elif value == "dell_ad_domain":
63 return "domain_name"
64 elif value == "dell_ssl_cert_verify":
65 return True
66 elif value == "dell_ssl_cert_path":
67 return "powerstore_cert_path"
69 @mock.patch(
70 "manila.share.drivers.dell_emc.plugins.powerstore.client."
71 "PowerStoreClient",
72 autospec=True,
73 )
74 def setUp(self, mock_powerstore_client):
75 super(PowerStoreTest, self).setUp()
77 self._mock_powerstore_client = mock_powerstore_client.return_value
78 self.storage_connection = connection.PowerStoreStorageConnection(LOG)
80 self.mock_context = mock.Mock("Context")
81 self.mock_emc_driver = mock.Mock("EmcDriver")
83 self._mock_config = self.MockConfig()
84 self.mock_emc_driver.attach_mock(self._mock_config, "configuration")
85 self.storage_connection.connect(
86 self.mock_emc_driver, self.mock_context
87 )
89 def test_connect(self):
90 storage_connection = connection.PowerStoreStorageConnection(LOG)
92 # execute method under test
93 storage_connection.connect(self.mock_emc_driver, self.mock_context)
95 # verify connect sets driver params appropriately
96 mock_config = self.MockConfig()
97 server_addr = mock_config.safe_get("dell_nas_backend_host")
98 self.assertEqual(server_addr, storage_connection.rest_ip)
99 expected_username = mock_config.safe_get("dell_nas_login")
100 self.assertEqual(expected_username, storage_connection.rest_username)
101 expected_password = mock_config.safe_get("dell_nas_password")
102 self.assertEqual(expected_password, storage_connection.rest_password)
103 expected_nas_server = mock_config.safe_get("dell_nas_server")
104 self.assertEqual(expected_nas_server, storage_connection.nas_server)
105 expected_ad_domain = mock_config.safe_get("dell_ad_domain")
106 self.assertEqual(expected_ad_domain, storage_connection.ad_domain)
107 expected_verify_certificate = mock_config.safe_get(
108 "dell_ssl_cert_verify"
109 )
110 self.assertEqual(
111 expected_verify_certificate, storage_connection.verify_certificate
112 )
114 def test_create_share_nfs(self):
115 self._mock_powerstore_client.get_nas_server_id.return_value = (
116 self.NAS_SERVER_ID
117 )
118 self._mock_powerstore_client.create_filesystem.return_value = (
119 self.FILESYSTEM_ID
120 )
121 self._mock_powerstore_client.create_nfs_export.return_value = (
122 self.NFS_EXPORT_ID
123 )
124 self._mock_powerstore_client.get_nas_server_interfaces.return_value = (
125 [{"ip": self.NAS_SERVER_IP, "preferred": True}]
126 )
128 self.assertFalse(self._mock_powerstore_client.get_nas_server_id.called)
129 self.assertFalse(self._mock_powerstore_client.create_filesystem.called)
130 self.assertFalse(self._mock_powerstore_client.create_nfs_export.called)
131 self.assertFalse(
132 self._mock_powerstore_client.get_nas_server_interfaces.called
133 )
135 # create the share
136 share = {"name": self.SHARE_NAME, "share_proto": "NFS",
137 "size": self.SHARE_SIZE_GB}
138 locations = self.storage_connection.create_share(
139 self.mock_context,
140 share,
141 None
142 )
144 # verify location and API call made
145 expected_locations = [
146 {"path": "%s:/%s" % (
147 self.NAS_SERVER_IP,
148 self.SHARE_NAME,
149 ),
150 "metadata": {
151 "preferred": True}}]
152 self.assertEqual(expected_locations, locations)
153 self._mock_powerstore_client.get_nas_server_id.assert_called_with(
154 self._mock_config.safe_get("dell_nas_server")
155 )
156 self._mock_powerstore_client.create_filesystem.assert_called_with(
157 self.NAS_SERVER_ID,
158 self.SHARE_NAME,
159 self.SHARE_SIZE_GB * units.Gi,
160 )
161 self._mock_powerstore_client.create_nfs_export.assert_called_with(
162 self.FILESYSTEM_ID, self.SHARE_NAME
163 )
164 self._mock_powerstore_client.get_nas_server_interfaces. \
165 assert_called_with(
166 self.NAS_SERVER_ID
167 )
169 def test_create_share_cifs(self):
170 self._mock_powerstore_client.get_nas_server_id.return_value = (
171 self.NAS_SERVER_ID
172 )
173 self._mock_powerstore_client.create_filesystem.return_value = (
174 self.FILESYSTEM_ID
175 )
176 self._mock_powerstore_client.create_smb_share.return_value = (
177 self.SMB_SHARE_ID
178 )
179 self._mock_powerstore_client.get_nas_server_interfaces.return_value = (
180 [{"ip": self.NAS_SERVER_IP, "preferred": True}]
181 )
183 self.assertFalse(self._mock_powerstore_client.get_nas_server_id.called)
184 self.assertFalse(self._mock_powerstore_client.create_filesystem.called)
185 self.assertFalse(self._mock_powerstore_client.create_smb_share.called)
186 self.assertFalse(
187 self._mock_powerstore_client.get_nas_server_interfaces.called
188 )
190 # create the share
191 share = {"name": self.SHARE_NAME, "share_proto": "CIFS",
192 "size": self.SHARE_SIZE_GB}
193 locations = self.storage_connection.create_share(
194 self.mock_context,
195 share,
196 None
197 )
199 # verify location and API call made
200 expected_locations = [
201 {"path": "\\\\%s\\%s" % (
202 self.NAS_SERVER_IP,
203 self.SHARE_NAME),
204 "metadata": {
205 "preferred": True}}]
206 self.assertEqual(expected_locations, locations)
207 self._mock_powerstore_client.get_nas_server_id.assert_called_with(
208 self._mock_config.safe_get("dell_nas_server")
209 )
210 self._mock_powerstore_client.create_filesystem.assert_called_with(
211 self.NAS_SERVER_ID,
212 self.SHARE_NAME,
213 self.SHARE_SIZE_GB * units.Gi,
214 )
215 self._mock_powerstore_client.create_smb_share.assert_called_with(
216 self.FILESYSTEM_ID, self.SHARE_NAME
217 )
218 self._mock_powerstore_client.get_nas_server_interfaces. \
219 assert_called_with(
220 self.NAS_SERVER_ID
221 )
223 def test_create_share_filesystem_id_not_found(self):
224 share = {"name": self.SHARE_NAME, "share_proto": "NFS",
225 "size": self.SHARE_SIZE_GB}
226 self._mock_powerstore_client.create_filesystem.return_value = None
228 self.assertRaises(
229 exception.ShareBackendException,
230 self.storage_connection.create_share,
231 self.mock_context,
232 share,
233 share_server=None
234 )
236 def test_create_share_nfs_backend_failure(self):
237 share = {"name": self.SHARE_NAME, "share_proto": "NFS",
238 "size": self.SHARE_SIZE_GB}
239 self._mock_powerstore_client.create_nfs_export.return_value = None
241 self.assertRaises(
242 exception.ShareBackendException,
243 self.storage_connection.create_share,
244 self.mock_context,
245 share,
246 share_server=None
247 )
249 def test_create_share_cifs_backend_failure(self):
250 share = {"name": self.SHARE_NAME, "share_proto": "CIFS",
251 "size": self.SHARE_SIZE_GB}
252 self._mock_powerstore_client.create_smb_share.return_value = None
254 self.assertRaises(
255 exception.ShareBackendException,
256 self.storage_connection.create_share,
257 self.mock_context,
258 share,
259 share_server=None
260 )
262 def test_delete_share_nfs(self):
263 share = {"name": self.SHARE_NAME, "share_proto": "NFS"}
265 self._mock_powerstore_client.get_filesystem_id.return_value = (
266 self.FILESYSTEM_ID
267 )
269 self.assertFalse(self._mock_powerstore_client.get_filesystem_id.called)
270 self.assertFalse(self._mock_powerstore_client.delete_filesystem.called)
272 # delete the share
273 self.storage_connection.delete_share(self.mock_context, share, None)
275 # verify share delete
276 self._mock_powerstore_client.get_filesystem_id.assert_called_with(
277 self.SHARE_NAME
278 )
279 self._mock_powerstore_client.delete_filesystem.assert_called_with(
280 self.FILESYSTEM_ID
281 )
283 def test_delete_nfs_share_backend_failure(self):
284 share = {"name": self.SHARE_NAME, "share_proto": "NFS"}
286 self._mock_powerstore_client.delete_filesystem.return_value = False
287 self.assertRaises(
288 exception.ShareBackendException,
289 self.storage_connection.delete_share,
290 self.mock_context,
291 share,
292 None,
293 )
295 def test_delete_nfs_share_share_does_not_exist(self):
296 self._mock_powerstore_client.get_filesystem_id.return_value = None
297 share = {"name": self.SHARE_NAME, "share_proto": "NFS"}
299 # verify the calling delete on a non-existent share returns and does
300 # not throw exception
301 self.storage_connection.delete_share(self.mock_context, share, None)
302 self.assertTrue(self._mock_powerstore_client.get_filesystem_id.called)
303 self.assertFalse(self._mock_powerstore_client.delete_filesystem.called)
305 def test_extend_share(self):
306 share = {
307 "name": self.SHARE_NAME,
308 "share_proto": "NFS",
309 "size": self.SHARE_NEW_SIZE_GB,
310 }
311 self._mock_powerstore_client.get_filesystem_id.return_value = (
312 self.FILESYSTEM_ID
313 )
314 self._mock_powerstore_client.resize_filesystem.return_value = (
315 True, None
316 )
317 self.assertFalse(self._mock_powerstore_client.get_filesystem_id.called)
319 self.storage_connection.extend_share(share, self.SHARE_NEW_SIZE_GB,
320 self.NAS_SERVER_NAME)
322 self._mock_powerstore_client.get_filesystem_id.assert_called_with(
323 self.SHARE_NAME
324 )
325 expected_quota_size = self.SHARE_NEW_SIZE_GB * units.Gi
326 self._mock_powerstore_client.resize_filesystem.assert_called_once_with(
327 self.FILESYSTEM_ID, expected_quota_size
328 )
330 def test_shrink_share(self):
331 share = {
332 "name": self.SHARE_NAME,
333 "share_proto": "NFS",
334 "size": self.SHARE_SIZE_GB,
335 }
336 self._mock_powerstore_client.get_filesystem_id.return_value = (
337 self.FILESYSTEM_ID
338 )
339 self._mock_powerstore_client.resize_filesystem.return_value = (
340 True, None
341 )
342 self.assertFalse(self._mock_powerstore_client.get_filesystem_id.called)
344 self.storage_connection.shrink_share(share, self.SHARE_NEW_SIZE_GB,
345 self.NAS_SERVER_NAME)
347 self._mock_powerstore_client.get_filesystem_id.assert_called_with(
348 self.SHARE_NAME
349 )
350 expected_quota_size = self.SHARE_NEW_SIZE_GB * units.Gi
351 self._mock_powerstore_client.resize_filesystem.assert_called_once_with(
352 self.FILESYSTEM_ID, expected_quota_size
353 )
355 def test_shrink_share_failure(self):
356 share = {
357 "name": self.SHARE_NAME,
358 "share_proto": "NFS",
359 "size": self.SHARE_SIZE_GB,
360 "id": self.CLONE_ID
361 }
362 self._mock_powerstore_client.get_filesystem_id.return_value = (
363 self.FILESYSTEM_ID
364 )
365 self._mock_powerstore_client.resize_filesystem.return_value = (
366 False, "msg"
367 )
369 self.assertRaises(
370 exception.ShareShrinkingPossibleDataLoss,
371 self.storage_connection.shrink_share,
372 share,
373 self.SHARE_NEW_SIZE_GB,
374 self.NAS_SERVER_NAME
375 )
377 def test_shrink_share_backend_failure(self):
378 share = {
379 "name": self.SHARE_NAME,
380 "share_proto": "NFS",
381 "size": self.SHARE_SIZE_GB,
382 }
383 self._mock_powerstore_client.get_filesystem_id.return_value = (
384 self.FILESYSTEM_ID
385 )
386 self._mock_powerstore_client.resize_filesystem.return_value = (
387 False, None
388 )
390 self.assertRaises(
391 exception.ShareBackendException,
392 self.storage_connection.shrink_share,
393 share,
394 self.SHARE_NEW_SIZE_GB,
395 self.NAS_SERVER_NAME
396 )
398 def test_update_access_add_nfs(self):
399 share = {"name": self.SHARE_NAME, "share_proto": "NFS"}
401 self._mock_powerstore_client.get_nfs_export_id.return_value = (
402 self.NFS_EXPORT_ID
403 )
404 self._mock_powerstore_client.set_export_access.return_value = True
406 self.assertFalse(self._mock_powerstore_client.get_nfs_export_id.called)
407 self.assertFalse(self._mock_powerstore_client.set_export_access.called)
409 nfs_access_rw = {
410 "access_type": "ip",
411 "access_to": self.RW_HOSTS,
412 "access_level": const.ACCESS_LEVEL_RW,
413 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08",
414 }
415 nfs_access_ro = {
416 "access_type": "ip",
417 "access_to": self.RO_HOSTS,
418 "access_level": const.ACCESS_LEVEL_RO,
419 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08",
420 }
421 access_rules = [nfs_access_rw, nfs_access_ro]
423 self.storage_connection.update_access(
424 self.mock_context,
425 share,
426 access_rules,
427 add_rules=None,
428 delete_rules=None,
429 share_server=None,
430 )
432 self._mock_powerstore_client.get_nfs_export_id.assert_called_once_with(
433 self.SHARE_NAME
434 )
435 self._mock_powerstore_client.set_export_access.assert_called_once_with(
436 self.NFS_EXPORT_ID, {self.RW_HOSTS}, {self.RO_HOSTS}
437 )
439 def test_update_access_add_cifs(self):
440 share = {"name": self.SHARE_NAME, "share_proto": "CIFS"}
442 self._mock_powerstore_client.get_smb_share_id.return_value = (
443 self.SMB_SHARE_ID
444 )
445 self._mock_powerstore_client.set_acl.return_value = True
447 self.assertFalse(self._mock_powerstore_client.get_smb_share_id.called)
448 self.assertFalse(self._mock_powerstore_client.set_acl.called)
450 cifs_access_rw = {
451 "access_type": "user",
452 "access_to": self.RW_USERS,
453 "access_level": const.ACCESS_LEVEL_RW,
454 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08",
455 }
456 cifs_access_ro = {
457 "access_type": "user",
458 "access_to": self.RO_USERS,
459 "access_level": const.ACCESS_LEVEL_RO,
460 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08",
461 }
462 access_rules = [cifs_access_rw, cifs_access_ro]
464 self.storage_connection.update_access(
465 self.mock_context,
466 share,
467 access_rules,
468 add_rules=None,
469 delete_rules=None,
470 share_server=None,
471 )
473 self._mock_powerstore_client.get_smb_share_id.assert_called_once_with(
474 self.SHARE_NAME
475 )
476 self._mock_powerstore_client.set_acl.assert_called_once_with(
477 self.SMB_SHARE_ID, {'domain_name\\user_1'}, {'domain_name\\user_2'}
478 )
480 def test_update_access_invalid_prefix(self):
481 share = {"name": self.SHARE_NAME, "share_proto": "CIFS"}
483 self._mock_powerstore_client.get_smb_share_id.return_value = (
484 self.SMB_SHARE_ID
485 )
486 self._mock_powerstore_client.get_nas_server_smb_netbios. \
487 return_value = None
489 self.assertFalse(self._mock_powerstore_client.get_smb_share_id.called)
490 self.assertFalse(self._mock_powerstore_client.set_acl.called)
492 cifs_access_rw = {
493 "access_type": "user",
494 "access_to": self.RW_USERS,
495 "access_level": const.ACCESS_LEVEL_RW,
496 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08",
497 }
498 cifs_access_ro = {
499 "access_type": "user",
500 "access_to": self.RO_USERS,
501 "access_level": const.ACCESS_LEVEL_RO,
502 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08",
503 }
504 access_rules = [cifs_access_rw, cifs_access_ro]
506 self.storage_connection.ad_domain = None
508 access_updates = self.storage_connection.update_access(
509 self.mock_context,
510 share,
511 access_rules,
512 add_rules=None,
513 delete_rules=None,
514 share_server=None,
515 )
517 self._mock_powerstore_client.set_acl.assert_called_once_with(
518 self.SMB_SHARE_ID, set(), set()
519 )
521 self.assertIsNotNone(access_updates)
523 def test_update_access_add_nfs_invalid_acess_type(self):
524 share = {
525 "name": self.SHARE_NAME,
526 "share_proto": "NFS",
527 "display_name": "foo_display_name",
528 }
530 nfs_access_rw = {
531 "access_type": "invalid_type",
532 "access_to": self.RW_HOSTS,
533 "access_level": const.ACCESS_LEVEL_RW,
534 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08",
535 }
536 nfs_access_ro = {
537 "access_type": "invalid_type",
538 "access_to": self.RO_HOSTS,
539 "access_level": const.ACCESS_LEVEL_RO,
540 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd09",
541 }
542 access_rules = [nfs_access_rw, nfs_access_ro]
544 self._mock_powerstore_client.get_nfs_export_id.return_value = (
545 self.NFS_EXPORT_ID
546 )
548 access_updates = self.storage_connection.update_access(
549 self.mock_context,
550 share,
551 access_rules,
552 add_rules=None,
553 delete_rules=None,
554 share_server=None,
555 )
557 self._mock_powerstore_client.set_export_access.assert_called_once_with(
558 self.NFS_EXPORT_ID, set(), set()
559 )
561 self.assertIsNotNone(access_updates)
563 def test_update_access_add_cifs_invalid_acess_type(self):
564 share = {
565 "name": self.SHARE_NAME,
566 "share_proto": "CIFS",
567 "display_name": "foo_display_name",
568 }
570 cifs_access_rw = {
571 "access_type": "invalid_type",
572 "access_to": self.RW_USERS,
573 "access_level": const.ACCESS_LEVEL_RW,
574 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08",
575 }
576 cifs_access_ro = {
577 "access_type": "invalid_type",
578 "access_to": self.RO_USERS,
579 "access_level": const.ACCESS_LEVEL_RO,
580 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd09",
581 }
582 access_rules = [cifs_access_rw, cifs_access_ro]
584 self._mock_powerstore_client.get_smb_share_id.return_value = (
585 self.SMB_SHARE_ID
586 )
588 access_updates = self.storage_connection.update_access(
589 self.mock_context,
590 share,
591 access_rules,
592 add_rules=None,
593 delete_rules=None,
594 share_server=None,
595 )
597 self._mock_powerstore_client.set_acl.assert_called_once_with(
598 self.SMB_SHARE_ID, set(), set()
599 )
601 self.assertIsNotNone(access_updates)
603 def test_update_access_add_nfs_backend_failure(self):
604 share = {
605 "name": self.SHARE_NAME,
606 "share_proto": "NFS",
607 "display_name": "foo_display_name",
608 }
610 self._mock_powerstore_client.get_nfs_export_id.return_value = (
611 self.NFS_EXPORT_ID
612 )
613 self._mock_powerstore_client.set_export_access.return_value = False
615 self.assertFalse(self._mock_powerstore_client.get_nfs_export_id.called)
616 self.assertFalse(self._mock_powerstore_client.set_export_access.called)
618 nfs_access_rw = {
619 "access_type": "ip",
620 "access_to": self.RW_HOSTS,
621 "access_level": const.ACCESS_LEVEL_RW,
622 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08",
623 }
624 nfs_access_ro = {
625 "access_type": "ip",
626 "access_to": self.RO_HOSTS,
627 "access_level": const.ACCESS_LEVEL_RO,
628 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08",
629 }
630 access_rules = [nfs_access_rw, nfs_access_ro]
632 self.assertRaises(
633 exception.ShareBackendException,
634 self.storage_connection.update_access,
635 self.mock_context,
636 share,
637 access_rules,
638 add_rules=None,
639 delete_rules=None,
640 share_server=None,
641 )
643 def test_update_access_add_cifs_backend_failure(self):
644 share = {
645 "name": self.SHARE_NAME,
646 "share_proto": "CIFS",
647 "display_name": "foo_display_name",
648 }
650 self._mock_powerstore_client.set_acl.return_value = False
652 cifs_access_rw = {
653 "access_type": "user",
654 "access_to": self.RW_USERS,
655 "access_level": const.ACCESS_LEVEL_RW,
656 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08",
657 }
658 cifs_access_ro = {
659 "access_type": "user",
660 "access_to": self.RO_USERS,
661 "access_level": const.ACCESS_LEVEL_RO,
662 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08",
663 }
664 access_rules = [cifs_access_rw, cifs_access_ro]
666 self.assertRaises(
667 exception.ShareBackendException,
668 self.storage_connection.update_access,
669 self.mock_context,
670 share,
671 access_rules,
672 add_rules=None,
673 delete_rules=None,
674 share_server=None,
675 )
677 def test_allow_access(self):
678 self.assertRaises(
679 NotImplementedError,
680 self.storage_connection.allow_access,
681 self.mock_context,
682 share=None,
683 access=None,
684 share_server=None,
685 )
687 def test_deny_access(self):
688 self.assertRaises(
689 NotImplementedError,
690 self.storage_connection.deny_access,
691 self.mock_context,
692 share=None,
693 access=None,
694 share_server=None,
695 )
697 def test_update_share_stats(self):
698 data = dict(
699 share_backend_name='powerstore',
700 vendor_name='Dell EMC',
701 storage_protocol='NFS_CIFS',
702 snapshot_support=True,
703 create_share_from_snapshot_support=True)
704 self._mock_powerstore_client.get_cluster_id.return_value = "0"
705 self._mock_powerstore_client. \
706 retreive_cluster_capacity_metrics.return_value = \
707 47345047046144, 366003363027
708 self.storage_connection.update_share_stats(data)
709 self.assertEqual(data['storage_protocol'], 'NFS_CIFS')
710 self.assertEqual(data['driver_version'], connection.VERSION)
711 self.assertEqual(data['total_capacity_gb'], 44093)
712 self.assertEqual(data['free_capacity_gb'], 43752)
714 def test_update_share_stats_failure(self):
715 data = dict(
716 share_backend_name='powerstore',
717 vendor_name='Dell EMC',
718 storage_protocol='NFS_CIFS',
719 snapshot_support=True,
720 create_share_from_snapshot_support=True)
721 self._mock_powerstore_client. \
722 retreive_cluster_capacity_metrics.return_value = \
723 None, None
724 self.storage_connection.update_share_stats(data)
725 self.assertIsNone(data.get('total_capacity_gb'))
726 self.assertIsNone(data.get('free_capacity_gb'))
728 def test_create_snapshot(self):
729 self._mock_powerstore_client.get_filesystem_id.return_value = (
730 self.FILESYSTEM_ID
731 )
732 self._mock_powerstore_client. \
733 create_snapshot.return_value = self.SNAPSHOT_ID
735 snapshot = {
736 "name": self.SNAPSHOT_NAME,
737 "share_name": self.SHARE_NAME
738 }
739 self.storage_connection.create_snapshot(
740 self.mock_context, snapshot, None
741 )
743 self._mock_powerstore_client.get_filesystem_id. \
744 assert_called_with(
745 self.SHARE_NAME
746 )
747 self._mock_powerstore_client.create_snapshot.assert_called_with(
748 self.FILESYSTEM_ID, self.SNAPSHOT_NAME
749 )
751 def test_create_snapshot_invalid_filesystem_id(self):
752 self._mock_powerstore_client.get_filesystem_id.return_value = (
753 None
754 )
756 snapshot = {
757 "name": self.SNAPSHOT_NAME,
758 "share_name": self.SHARE_NAME
759 }
760 self.assertRaises(
761 exception.ShareBackendException,
762 self.storage_connection.create_snapshot,
763 self.mock_context,
764 snapshot,
765 None
766 )
768 def test_create_snapshot_backend_failure(self):
769 self._mock_powerstore_client.get_filesystem_id.return_value = (
770 self.FILESYSTEM_ID
771 )
772 self._mock_powerstore_client. \
773 create_snapshot.return_value = None
775 snapshot = {
776 "name": self.SNAPSHOT_NAME,
777 "share_name": self.SHARE_NAME,
778 "share": {
779 "share_proto": "NFS"
780 }
781 }
782 self.assertRaises(
783 exception.ShareBackendException,
784 self.storage_connection.create_snapshot,
785 self.mock_context,
786 snapshot,
787 None
788 )
790 def test_delete_snapshot(self):
791 self._mock_powerstore_client.get_filesystem_id.return_value = (
792 self.SNAPSHOT_ID
793 )
794 self._mock_powerstore_client.delete_filesystem.return_value = True
796 snapshot = {
797 "name": self.SNAPSHOT_NAME,
798 "share_name": self.SHARE_NAME
799 }
800 self.storage_connection.delete_snapshot(
801 self.mock_context, snapshot, None
802 )
804 self._mock_powerstore_client.get_filesystem_id. \
805 assert_called_with(
806 self.SNAPSHOT_NAME
807 )
808 self._mock_powerstore_client.delete_filesystem.assert_called_with(
809 self.SNAPSHOT_ID
810 )
812 def test_delete_snapshot_backend_failure(self):
813 self._mock_powerstore_client.get_filesystem_id.return_value = (
814 self.SNAPSHOT_ID
815 )
816 self._mock_powerstore_client.delete_filesystem.return_value = False
818 snapshot = {
819 "name": self.SNAPSHOT_NAME,
820 "share_name": self.SHARE_NAME
821 }
822 self.assertRaises(
823 exception.ShareBackendException,
824 self.storage_connection.delete_snapshot,
825 self.mock_context,
826 snapshot,
827 None
828 )
830 def test_revert_to_snapshot(self):
831 self._mock_powerstore_client.get_filesystem_id.return_value = (
832 self.SNAPSHOT_ID
833 )
834 self._mock_powerstore_client.restore_snapshot.return_value = True
836 snapshot = {
837 "name": self.SNAPSHOT_NAME,
838 "share_name": self.SHARE_NAME
839 }
840 self.storage_connection.revert_to_snapshot(
841 self.mock_context, snapshot, None, None, None
842 )
844 self._mock_powerstore_client.get_filesystem_id. \
845 assert_called_with(
846 self.SNAPSHOT_NAME
847 )
848 self._mock_powerstore_client.restore_snapshot.assert_called_with(
849 self.SNAPSHOT_ID
850 )
852 def test_revert_to_snapshot_backend_failure(self):
853 self._mock_powerstore_client.get_filesystem_id.return_value = (
854 self.SNAPSHOT_ID
855 )
856 self._mock_powerstore_client.restore_snapshot.return_value = False
858 snapshot = {
859 "name": self.SNAPSHOT_NAME,
860 "share_name": self.SHARE_NAME
861 }
862 self.assertRaises(
863 exception.ShareBackendException,
864 self.storage_connection.revert_to_snapshot,
865 self.mock_context,
866 snapshot,
867 None, None, None
868 )
870 def test_create_share_from_snapshot_nfs(self):
871 self._mock_powerstore_client.get_nas_server_id.return_value = (
872 self.NAS_SERVER_ID
873 )
874 self._mock_powerstore_client.get_filesystem_id.return_value = (
875 self.SNAPSHOT_ID
876 )
877 self._mock_powerstore_client.clone_snapshot.return_value = (
878 self.CLONE_ID
879 )
880 self._mock_powerstore_client.create_nfs_export.return_value = (
881 self.NFS_EXPORT_ID
882 )
883 self._mock_powerstore_client.get_nas_server_interfaces.return_value = (
884 [{"ip": self.NAS_SERVER_IP, "preferred": True}]
885 )
886 self._mock_powerstore_client.resize_filesystem.return_value = (
887 True, None
888 )
889 share = {"name": self.SHARE_NAME, "share_proto": "NFS",
890 "size": self.SHARE_NEW_SIZE_GB}
891 snapshot = {"name": self.SNAPSHOT_NAME, "size": self.SHARE_SIZE_GB}
892 locations = self.storage_connection.create_share_from_snapshot(
893 self.mock_context,
894 share,
895 snapshot
896 )
897 expected_locations = [
898 {"path": "%s:/%s" % (
899 self.NAS_SERVER_IP,
900 self.SHARE_NAME,
901 ),
902 "metadata": {
903 "preferred": True}}]
904 self.assertEqual(expected_locations, locations)
905 self._mock_powerstore_client.get_nas_server_id.assert_called_with(
906 self._mock_config.safe_get("dell_nas_server")
907 )
908 self._mock_powerstore_client.clone_snapshot.assert_called_with(
909 self.SNAPSHOT_ID,
910 self.SHARE_NAME
911 )
912 self._mock_powerstore_client.create_nfs_export.assert_called_with(
913 self.CLONE_ID,
914 self.SHARE_NAME
915 )
916 self._mock_powerstore_client.get_nas_server_interfaces. \
917 assert_called_with(
918 self.NAS_SERVER_ID
919 )
921 def test_create_share_from_snapshot_cifs(self):
922 self._mock_powerstore_client.get_nas_server_id.return_value = (
923 self.NAS_SERVER_ID
924 )
925 self._mock_powerstore_client.get_filesystem_id.return_value = (
926 self.SNAPSHOT_ID
927 )
928 self._mock_powerstore_client.clone_snapshot.return_value = (
929 self.CLONE_ID
930 )
931 self._mock_powerstore_client.create_smb_share.return_value = (
932 self.NFS_EXPORT_ID
933 )
934 self._mock_powerstore_client.get_nas_server_interfaces.return_value = (
935 [{"ip": self.NAS_SERVER_IP, "preferred": True}]
936 )
937 self._mock_powerstore_client.resize_filesystem.return_value = (
938 True, None
939 )
941 share = {"name": self.SHARE_NAME, "share_proto": "CIFS",
942 "size": self.SHARE_NEW_SIZE_GB}
943 snapshot = {"name": self.SNAPSHOT_NAME, "size": self.SHARE_SIZE_GB}
944 locations = self.storage_connection.create_share_from_snapshot(
945 self.mock_context,
946 share,
947 snapshot
948 )
949 expected_locations = [
950 {"path": "\\\\%s\\%s" % (
951 self.NAS_SERVER_IP,
952 self.SHARE_NAME),
953 "metadata": {
954 "preferred": True}}]
955 self.assertEqual(expected_locations, locations)
956 self._mock_powerstore_client.get_nas_server_id.assert_called_with(
957 self._mock_config.safe_get("dell_nas_server")
958 )
959 self._mock_powerstore_client.clone_snapshot.assert_called_with(
960 self.SNAPSHOT_ID,
961 self.SHARE_NAME
962 )
963 self._mock_powerstore_client.create_smb_share.assert_called_with(
964 self.CLONE_ID,
965 self.SHARE_NAME
966 )
967 self._mock_powerstore_client.get_nas_server_interfaces. \
968 assert_called_with(
969 self.NAS_SERVER_ID
970 )
972 def test_create_share_from_snapshot_clone_failure(self):
973 share = {"name": self.SHARE_NAME, "share_proto": "NFS"}
974 snapshot = {"name": self.SNAPSHOT_NAME}
975 self._mock_powerstore_client.clone_snapshot.return_value = None
977 self.assertRaises(
978 exception.ShareBackendException,
979 self.storage_connection.create_share_from_snapshot,
980 self.mock_context,
981 share,
982 snapshot
983 )
985 def test_create_share_from_snapshot_export_failure(self):
986 share = {"name": self.SHARE_NAME, "share_proto": "NFS"}
987 snapshot = {"name": self.SNAPSHOT_NAME}
988 self._mock_powerstore_client.create_nfs_export.return_value = None
990 self.assertRaises(
991 exception.ShareBackendException,
992 self.storage_connection.create_share_from_snapshot,
993 self.mock_context,
994 share,
995 snapshot
996 )
998 def test_create_share_from_snapshot_share_failure(self):
999 share = {"name": self.SHARE_NAME, "share_proto": "CIFS"}
1000 snapshot = {"name": self.SNAPSHOT_NAME}
1001 self._mock_powerstore_client.create_smb_share.return_value = None
1003 self.assertRaises(
1004 exception.ShareBackendException,
1005 self.storage_connection.create_share_from_snapshot,
1006 self.mock_context,
1007 share,
1008 snapshot
1009 )
1011 def test_get_default_filter_function(self):
1012 filter = self.storage_connection.get_default_filter_function()
1013 self.assertEqual(filter, "share.size >= 3")