Coverage for manila/tests/share/drivers/dell_emc/plugins/unity/test_client.py: 96%
171 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2016 EMC Corporation.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16from unittest import mock
18import ddt
19from oslo_utils import units
21from manila import exception
22from manila import test
23from manila.tests.share.drivers.dell_emc.plugins.unity import fake_exceptions
24from manila.tests.share.drivers.dell_emc.plugins.unity import res_mock
27@ddt.ddt
28class TestClient(test.TestCase):
29 @res_mock.mock_client_input
30 @res_mock.patch_client
31 def test_create_cifs_share__existed_expt(self, client, mocked_input):
32 resource = mocked_input['filesystem']
33 share = mocked_input['cifs_share']
35 new_share = client.create_cifs_share(resource, share.name)
37 self.assertEqual(share.name, new_share.name)
39 @res_mock.mock_client_input
40 @res_mock.patch_client
41 def test_create_nfs_share__existed_expt(self, client, mocked_input):
42 resource = mocked_input['filesystem']
43 share = mocked_input['nfs_share']
44 new_share = client.create_nfs_share(resource, share.name)
46 self.assertEqual(share.name, new_share.name)
48 @res_mock.mock_client_input
49 @res_mock.patch_client
50 def test_create_nfs_filesystem_and_share(self, client, mocked_input):
51 pool = mocked_input['pool']
52 nas_server = mocked_input['nas_server']
53 share = mocked_input['nfs_share']
55 client.create_nfs_filesystem_and_share(
56 pool, nas_server, share.name,
57 share.size)
59 @res_mock.mock_client_input
60 @res_mock.patch_client
61 def test_get_share_with_invalid_proto(self, client, mocked_input):
62 share = mocked_input['share']
64 self.assertRaises(exception.BadConfigurationException,
65 client.get_share,
66 share.name,
67 'fake_proto')
69 @res_mock.mock_client_input
70 @res_mock.patch_client
71 def test_create_filesystem__existed_expt(self, client, mocked_input):
72 pool = mocked_input['pool']
73 nas_server = mocked_input['nas_server']
74 filesystem = mocked_input['filesystem']
76 new_filesystem = client.create_filesystem(pool,
77 nas_server,
78 filesystem.name,
79 filesystem.size,
80 filesystem.proto)
82 self.assertEqual(filesystem.name, new_filesystem.name)
84 @res_mock.mock_client_input
85 @res_mock.patch_client
86 def test_delete_filesystem__nonexistent_expt(self, client, mocked_input):
87 filesystem = mocked_input['filesystem']
89 client.delete_filesystem(filesystem)
91 @res_mock.mock_client_input
92 @res_mock.patch_client
93 def test_create_nas_server__existed_expt(self, client, mocked_input):
94 sp = mocked_input['sp']
95 pool = mocked_input['pool']
96 nas_server = mocked_input['nas_server']
98 new_nas_server = client.create_nas_server(nas_server.name, sp, pool)
100 self.assertEqual(nas_server.name, new_nas_server.name)
102 @res_mock.mock_client_input
103 @res_mock.patch_client
104 def test_delete_nas_server__nonexistent_expt(self, client, mocked_input):
105 nas_server = mocked_input['nas_server']
107 client.delete_nas_server(nas_server.name)
109 @res_mock.mock_client_input
110 @res_mock.patch_client
111 def test_create_dns_server__existed_expt(self, client, mocked_input):
112 nas_server = mocked_input['nas_server']
114 client.create_dns_server(nas_server, 'fake_domain', 'fake_dns_ip')
116 @res_mock.mock_client_input
117 @res_mock.patch_client
118 def test_create_interface__existed_expt(self, client, mocked_input):
119 nas_server = mocked_input['nas_server']
120 self.assertRaises(exception.IPAddressInUse, client.create_interface,
121 nas_server, 'fake_ip_addr', 'fake_mask',
122 'fake_gateway', port_id='fake_port_id')
124 @res_mock.mock_client_input
125 @res_mock.patch_client
126 def test_enable_cifs_service__existed_expt(self, client, mocked_input):
127 nas_server = mocked_input['nas_server']
129 client.enable_cifs_service(
130 nas_server, 'domain_name', 'fake_user', 'fake_passwd')
132 @res_mock.mock_client_input
133 @res_mock.patch_client
134 def test_enable_nfs_service__existed_expt(self, client, mocked_input):
135 nas_server = mocked_input['nas_server']
137 client.enable_nfs_service(nas_server)
139 @res_mock.mock_client_input
140 @res_mock.patch_client
141 def test_create_snapshot__existed_expt(self, client, mocked_input):
142 nas_server = mocked_input['filesystem']
143 exp_snap = mocked_input['snapshot']
145 client.create_snapshot(nas_server, exp_snap.name)
147 @res_mock.mock_client_input
148 @res_mock.patch_client
149 def test_create_snap_of_snap__existed_expt(self, client, mocked_input):
150 snapshot = mocked_input['src_snapshot']
151 dest_snap = mocked_input['dest_snapshot']
153 new_snap = client.create_snap_of_snap(snapshot, dest_snap.name)
155 self.assertEqual(dest_snap.name, new_snap.name)
157 @res_mock.mock_client_input
158 @res_mock.patch_client
159 def test_delete_snapshot__nonexistent_expt(self, client, mocked_input):
160 snapshot = mocked_input['snapshot']
162 client.delete_snapshot(snapshot)
164 @res_mock.patch_client
165 def test_cifs_deny_access__nonexistentuser_expt(self, client):
166 try:
167 client.cifs_deny_access('fake_share_name', 'fake_username')
168 except fake_exceptions.UnityAclUserNotFoundError:
169 self.fail("UnityAclUserNotFoundError raised unexpectedly!")
171 @res_mock.patch_client
172 def test_nfs_deny_access__nonexistent_expt(self, client):
173 client.nfs_deny_access('fake_share_name', 'fake_ip_addr')
175 @res_mock.patch_client
176 def test_get_storage_processor(self, client):
177 sp = client.get_storage_processor(sp_id='SPA')
179 self.assertEqual('SPA', sp.name)
181 @res_mock.mock_client_input
182 @res_mock.patch_client
183 def test_extend_filesystem(self, client, mocked_input):
184 fs = mocked_input['fs']
186 size = client.extend_filesystem(fs, 5)
188 self.assertEqual(5 * units.Gi, size)
190 @res_mock.mock_client_input
191 @res_mock.patch_client
192 def test_shrink_filesystem(self, client, mocked_input):
193 fs = mocked_input['fs']
195 size = client.shrink_filesystem('fake_share_id_1', fs, 4)
197 self.assertEqual(4 * units.Gi, size)
199 @res_mock.mock_client_input
200 @res_mock.patch_client
201 def test_shrink_filesystem_size_too_small(self, client, mocked_input):
202 fs = mocked_input['fs']
204 self.assertRaises(exception.ShareShrinkingPossibleDataLoss,
205 client.shrink_filesystem, 'fake_share_id_2', fs, 4)
207 @res_mock.patch_client
208 def test_get_file_ports(self, client):
209 ports = client.get_file_ports()
210 self.assertEqual(2, len(ports))
212 @res_mock.patch_client
213 def test_get_tenant(self, client):
214 tenant = client.get_tenant('test', 5)
215 self.assertEqual('tenant_1', tenant.id)
217 @res_mock.patch_client
218 def test_get_tenant_preexist(self, client):
219 tenant = client.get_tenant('test', 6)
220 self.assertEqual('tenant_1', tenant.id)
222 @res_mock.patch_client
223 def test_get_tenant_name_inuse_but_vlan_not_used(self, client):
224 self.assertRaises(fake_exceptions.UnityTenantNameInUseError,
225 client.get_tenant, 'test', 7)
227 @res_mock.patch_client
228 def test_get_tenant_for_vlan_0(self, client):
229 tenant = client.get_tenant('tenant', 0)
230 self.assertIsNone(tenant)
232 @res_mock.patch_client
233 def test_get_tenant_for_vlan_already_has_interfaces(self, client):
234 tenant = client.get_tenant('tenant', 3)
235 self.assertEqual('tenant_1', tenant.id)
237 @res_mock.mock_client_input
238 @res_mock.patch_client
239 def test_create_file_interface_ipv6(self, client, mocked_input):
240 mock_nas_server = mock.Mock()
241 mock_nas_server.create_file_interface = mock.Mock(return_value=None)
242 mock_file_interface = mocked_input['file_interface']
243 mock_port_id = mock.Mock()
244 client.create_interface(mock_nas_server,
245 mock_file_interface.ip_addr,
246 netmask=None,
247 gateway=mock_file_interface.gateway,
248 port_id=mock_port_id,
249 vlan_id=mock_file_interface.vlan_id,
250 prefix_length=mock_file_interface.prefix_length
251 )
252 mock_nas_server.create_file_interface.assert_called_once_with(
253 mock_port_id,
254 mock_file_interface.ip_addr,
255 netmask=None,
256 v6_prefix_length=mock_file_interface.prefix_length,
257 gateway=mock_file_interface.gateway,
258 vlan_id=mock_file_interface.vlan_id)
260 @res_mock.patch_client
261 def test_get_snapshot(self, client):
262 snapshot = client.get_snapshot('Snapshot_1')
263 self.assertEqual('snapshot_1', snapshot.id)
265 @res_mock.patch_client
266 def test_restore_snapshot(self, client):
267 snapshot = client.get_snapshot('Snapshot_1')
268 rst = client.restore_snapshot(snapshot.name)
269 self.assertIs(True, rst)
270 snapshot.restore.assert_called_once_with(delete_backup=True)