Coverage for manila/tests/share/drivers/netapp/dataontap/protocols/test_nfs_cmode.py: 100%
133 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2015 Clinton Knight. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14"""
15Mock unit tests for the NetApp driver protocols NFS class module.
16"""
18import copy
19from unittest import mock
20import uuid
22import ddt
24from manila import exception
25from manila.share.drivers.netapp.dataontap.protocols import nfs_cmode
26from manila import test
27from manila.tests.share.drivers.netapp.dataontap.protocols \
28 import fakes as fake
31@ddt.ddt
32class NetAppClusteredNFSHelperTestCase(test.TestCase):
34 def setUp(self):
35 super(NetAppClusteredNFSHelperTestCase, self).setUp()
37 self.mock_context = mock.Mock()
38 self.mock_client = mock.Mock()
39 self.helper = nfs_cmode.NetAppCmodeNFSHelper()
40 self.helper.set_client(self.mock_client)
42 @ddt.data(('1.2.3.4', '1.2.3.4'), ('fc00::1', '[fc00::1]'))
43 @ddt.unpack
44 def test__escaped_address(self, raw, escaped):
45 self.assertEqual(escaped, self.helper._escaped_address(raw))
47 @ddt.data(True, False)
48 def test_create_share(self, is_flexgroup):
50 mock_ensure_export_policy = self.mock_object(self.helper,
51 '_ensure_export_policy')
52 self.mock_client.get_volume_junction_path.return_value = (
53 fake.NFS_SHARE_PATH)
54 self.mock_client.get_volume.return_value = {
55 'junction-path': fake.NFS_SHARE_PATH,
56 }
58 result = self.helper.create_share(fake.NFS_SHARE, fake.SHARE_NAME,
59 is_flexgroup=is_flexgroup)
61 export_addresses = [fake.SHARE_ADDRESS_1, fake.SHARE_ADDRESS_2]
62 export_paths = [result(address) for address in export_addresses]
63 expected_paths = [
64 fake.SHARE_ADDRESS_1 + ":" + fake.NFS_SHARE_PATH,
65 fake.SHARE_ADDRESS_2 + ":" + fake.NFS_SHARE_PATH,
66 ]
67 self.assertEqual(expected_paths, export_paths)
68 (self.mock_client.clear_nfs_export_policy_for_volume.
69 assert_called_once_with(fake.SHARE_NAME))
70 self.assertTrue(mock_ensure_export_policy.called)
71 if is_flexgroup:
72 self.assertTrue(self.mock_client.get_volume.called)
73 else:
74 self.assertTrue(self.mock_client.get_volume_junction_path.called)
76 def test_delete_share(self):
78 self.helper.delete_share(fake.NFS_SHARE, fake.SHARE_NAME)
80 (self.mock_client.clear_nfs_export_policy_for_volume.
81 assert_called_once_with(fake.SHARE_NAME))
82 self.mock_client.soft_delete_nfs_export_policy.assert_called_once_with(
83 fake.EXPORT_POLICY_NAME)
85 @ddt.data(True, False)
86 def test_update_access(self, all_squash_metadata):
88 self.mock_object(self.helper, '_ensure_export_policy')
89 self.mock_object(self.helper,
90 '_get_export_policy_name',
91 mock.Mock(return_value='fake_export_policy'))
92 self.mock_object(self.helper,
93 '_get_temp_export_policy_name',
94 mock.Mock(side_effect=['fake_new_export_policy',
95 'fake_old_export_policy']))
96 fake_auth_method = 'fake_auth_method'
97 self.mock_object(self.helper,
98 '_get_auth_methods',
99 mock.Mock(return_value=fake_auth_method))
101 share = fake.NFS_SHARE.copy()
102 if all_squash_metadata:
103 share.update({'metadata': {'all_squash': 'true'}})
104 else:
105 share.update({'metadata': None})
107 self.helper.update_access(share,
108 fake.SHARE_NAME,
109 [fake.IP_ACCESS])
111 self.mock_client.create_nfs_export_policy.assert_called_once_with(
112 'fake_new_export_policy')
113 if all_squash_metadata:
114 self.mock_client.add_nfs_export_rule.assert_called_once_with(
115 'fake_new_export_policy', fake.CLIENT_ADDRESS_1, False,
116 ['none'])
117 else:
118 self.mock_client.add_nfs_export_rule.assert_called_once_with(
119 'fake_new_export_policy', fake.CLIENT_ADDRESS_1, False,
120 fake_auth_method)
122 (self.mock_client.set_nfs_export_policy_for_volume.
123 assert_called_once_with(fake.SHARE_NAME, 'fake_new_export_policy'))
124 (self.mock_client.soft_delete_nfs_export_policy.
125 assert_called_once_with('fake_old_export_policy'))
126 self.mock_client.rename_nfs_export_policy.assert_has_calls([
127 mock.call('fake_export_policy', 'fake_old_export_policy'),
128 mock.call('fake_new_export_policy', 'fake_export_policy'),
129 ])
131 def test_validate_access_rule(self):
133 result = self.helper._validate_access_rule(fake.IP_ACCESS)
135 self.assertIsNone(result)
137 def test_validate_access_rule_invalid_type(self):
139 rule = copy.copy(fake.IP_ACCESS)
140 rule['access_type'] = 'user'
142 self.assertRaises(exception.InvalidShareAccess,
143 self.helper._validate_access_rule,
144 rule)
146 def test_validate_access_rule_invalid_level(self):
148 rule = copy.copy(fake.IP_ACCESS)
149 rule['access_level'] = 'none'
151 self.assertRaises(exception.InvalidShareAccessLevel,
152 self.helper._validate_access_rule,
153 rule)
155 def test_get_target(self):
157 target = self.helper.get_target(fake.NFS_SHARE)
158 self.assertEqual(fake.SHARE_ADDRESS_1, target)
160 def test_get_share_name_for_share(self):
162 self.mock_client.get_volume_at_junction_path.return_value = (
163 fake.VOLUME)
165 share_name = self.helper.get_share_name_for_share(fake.NFS_SHARE)
167 self.assertEqual(fake.SHARE_NAME, share_name)
168 self.mock_client.get_volume_at_junction_path.assert_called_once_with(
169 fake.NFS_SHARE_PATH)
171 def test_get_share_name_for_share_not_found(self):
173 self.mock_client.get_volume_at_junction_path.return_value = None
175 share_name = self.helper.get_share_name_for_share(fake.NFS_SHARE)
177 self.assertIsNone(share_name)
178 self.mock_client.get_volume_at_junction_path.assert_called_once_with(
179 fake.NFS_SHARE_PATH)
181 def test_get_target_missing_location(self):
183 target = self.helper.get_target({'export_location': ''})
184 self.assertEqual('', target)
186 def test_get_export_location(self):
188 export = fake.NFS_SHARE['export_location']
189 self.mock_object(self.helper, '_get_share_export_location',
190 mock.Mock(return_value=export))
192 host_ip, export_path = self.helper._get_export_location(
193 fake.NFS_SHARE)
194 self.assertEqual(fake.SHARE_ADDRESS_1, host_ip)
195 self.assertEqual('/' + fake.SHARE_NAME, export_path)
197 @ddt.data('', 'invalid')
198 def test_get_export_location_missing_location_invalid(self, export):
200 fake_share = fake.NFS_SHARE.copy()
201 fake_share['export_location'] = export
202 self.mock_object(self.helper, '_get_share_export_location',
203 mock.Mock(return_value=export))
205 host_ip, export_path = self.helper._get_export_location(fake_share)
207 self.assertEqual('', host_ip)
208 self.assertEqual('', export_path)
209 self.helper._get_share_export_location.assert_called_once_with(
210 fake_share)
212 def test_get_temp_export_policy_name(self):
214 self.mock_object(uuid, 'uuid1', mock.Mock(return_value='fake-uuid'))
216 result = self.helper._get_temp_export_policy_name()
218 self.assertEqual('temp_fake_uuid', result)
220 def test_get_export_policy_name(self):
222 result = self.helper._get_export_policy_name(fake.NFS_SHARE)
223 self.assertEqual(fake.EXPORT_POLICY_NAME, result)
225 def test_ensure_export_policy_equal(self):
227 self.mock_client.get_nfs_export_policy_for_volume.return_value = (
228 fake.EXPORT_POLICY_NAME)
230 self.helper._ensure_export_policy(fake.NFS_SHARE, fake.SHARE_NAME)
232 self.assertFalse(self.mock_client.create_nfs_export_policy.called)
233 self.assertFalse(self.mock_client.rename_nfs_export_policy.called)
235 def test_ensure_export_policy_default(self):
237 self.mock_client.get_nfs_export_policy_for_volume.return_value = (
238 'default')
240 self.helper._ensure_export_policy(fake.NFS_SHARE, fake.SHARE_NAME)
242 self.mock_client.create_nfs_export_policy.assert_called_once_with(
243 fake.EXPORT_POLICY_NAME)
244 (self.mock_client.set_nfs_export_policy_for_volume.
245 assert_called_once_with(fake.SHARE_NAME, fake.EXPORT_POLICY_NAME))
246 self.assertFalse(self.mock_client.rename_nfs_export_policy.called)
248 def test_ensure_export_policy_rename(self):
250 self.mock_client.get_nfs_export_policy_for_volume.return_value = 'fake'
252 self.helper._ensure_export_policy(fake.NFS_SHARE, fake.SHARE_NAME)
254 self.assertFalse(self.mock_client.create_nfs_export_policy.called)
255 self.mock_client.rename_nfs_export_policy.assert_called_once_with(
256 'fake', fake.EXPORT_POLICY_NAME)
258 @ddt.data((False, ['sys']), (True, ['krb5', 'krb5i', 'krb5p']))
259 @ddt.unpack
260 def test__get_security_flavors(self, kerberos_enabled, security_flavors):
261 self.mock_client.is_kerberos_enabled.return_value = kerberos_enabled
263 result = self.helper._get_auth_methods()
265 self.assertEqual(security_flavors, result)
267 def test_cleanup_demoted_replica(self):
269 self.mock_object(self.helper, 'delete_share')
270 self.helper.cleanup_demoted_replica(fake.NFS_SHARE, fake.SHARE_NAME)
272 self.helper.delete_share.assert_called_once_with(fake.NFS_SHARE,
273 fake.SHARE_NAME)