Coverage for manila/tests/share/drivers/netapp/dataontap/protocols/test_nfs_cmode.py: 100%

133 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2015 Clinton Knight. All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); you may 

4# not use this file except in compliance with the License. You may obtain 

5# a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

12# License for the specific language governing permissions and limitations 

13# under the License. 

14""" 

15Mock unit tests for the NetApp driver protocols NFS class module. 

16""" 

17 

18import copy 

19from unittest import mock 

20import uuid 

21 

22import ddt 

23 

24from manila import exception 

25from manila.share.drivers.netapp.dataontap.protocols import nfs_cmode 

26from manila import test 

27from manila.tests.share.drivers.netapp.dataontap.protocols \ 

28 import fakes as fake 

29 

30 

31@ddt.ddt 

32class NetAppClusteredNFSHelperTestCase(test.TestCase): 

33 

34 def setUp(self): 

35 super(NetAppClusteredNFSHelperTestCase, self).setUp() 

36 

37 self.mock_context = mock.Mock() 

38 self.mock_client = mock.Mock() 

39 self.helper = nfs_cmode.NetAppCmodeNFSHelper() 

40 self.helper.set_client(self.mock_client) 

41 

42 @ddt.data(('1.2.3.4', '1.2.3.4'), ('fc00::1', '[fc00::1]')) 

43 @ddt.unpack 

44 def test__escaped_address(self, raw, escaped): 

45 self.assertEqual(escaped, self.helper._escaped_address(raw)) 

46 

47 @ddt.data(True, False) 

48 def test_create_share(self, is_flexgroup): 

49 

50 mock_ensure_export_policy = self.mock_object(self.helper, 

51 '_ensure_export_policy') 

52 self.mock_client.get_volume_junction_path.return_value = ( 

53 fake.NFS_SHARE_PATH) 

54 self.mock_client.get_volume.return_value = { 

55 'junction-path': fake.NFS_SHARE_PATH, 

56 } 

57 

58 result = self.helper.create_share(fake.NFS_SHARE, fake.SHARE_NAME, 

59 is_flexgroup=is_flexgroup) 

60 

61 export_addresses = [fake.SHARE_ADDRESS_1, fake.SHARE_ADDRESS_2] 

62 export_paths = [result(address) for address in export_addresses] 

63 expected_paths = [ 

64 fake.SHARE_ADDRESS_1 + ":" + fake.NFS_SHARE_PATH, 

65 fake.SHARE_ADDRESS_2 + ":" + fake.NFS_SHARE_PATH, 

66 ] 

67 self.assertEqual(expected_paths, export_paths) 

68 (self.mock_client.clear_nfs_export_policy_for_volume. 

69 assert_called_once_with(fake.SHARE_NAME)) 

70 self.assertTrue(mock_ensure_export_policy.called) 

71 if is_flexgroup: 

72 self.assertTrue(self.mock_client.get_volume.called) 

73 else: 

74 self.assertTrue(self.mock_client.get_volume_junction_path.called) 

75 

76 def test_delete_share(self): 

77 

78 self.helper.delete_share(fake.NFS_SHARE, fake.SHARE_NAME) 

79 

80 (self.mock_client.clear_nfs_export_policy_for_volume. 

81 assert_called_once_with(fake.SHARE_NAME)) 

82 self.mock_client.soft_delete_nfs_export_policy.assert_called_once_with( 

83 fake.EXPORT_POLICY_NAME) 

84 

85 @ddt.data(True, False) 

86 def test_update_access(self, all_squash_metadata): 

87 

88 self.mock_object(self.helper, '_ensure_export_policy') 

89 self.mock_object(self.helper, 

90 '_get_export_policy_name', 

91 mock.Mock(return_value='fake_export_policy')) 

92 self.mock_object(self.helper, 

93 '_get_temp_export_policy_name', 

94 mock.Mock(side_effect=['fake_new_export_policy', 

95 'fake_old_export_policy'])) 

96 fake_auth_method = 'fake_auth_method' 

97 self.mock_object(self.helper, 

98 '_get_auth_methods', 

99 mock.Mock(return_value=fake_auth_method)) 

100 

101 share = fake.NFS_SHARE.copy() 

102 if all_squash_metadata: 

103 share.update({'metadata': {'all_squash': 'true'}}) 

104 else: 

105 share.update({'metadata': None}) 

106 

107 self.helper.update_access(share, 

108 fake.SHARE_NAME, 

109 [fake.IP_ACCESS]) 

110 

111 self.mock_client.create_nfs_export_policy.assert_called_once_with( 

112 'fake_new_export_policy') 

113 if all_squash_metadata: 

114 self.mock_client.add_nfs_export_rule.assert_called_once_with( 

115 'fake_new_export_policy', fake.CLIENT_ADDRESS_1, False, 

116 ['none']) 

117 else: 

118 self.mock_client.add_nfs_export_rule.assert_called_once_with( 

119 'fake_new_export_policy', fake.CLIENT_ADDRESS_1, False, 

120 fake_auth_method) 

121 

122 (self.mock_client.set_nfs_export_policy_for_volume. 

123 assert_called_once_with(fake.SHARE_NAME, 'fake_new_export_policy')) 

124 (self.mock_client.soft_delete_nfs_export_policy. 

125 assert_called_once_with('fake_old_export_policy')) 

126 self.mock_client.rename_nfs_export_policy.assert_has_calls([ 

127 mock.call('fake_export_policy', 'fake_old_export_policy'), 

128 mock.call('fake_new_export_policy', 'fake_export_policy'), 

129 ]) 

130 

131 def test_validate_access_rule(self): 

132 

133 result = self.helper._validate_access_rule(fake.IP_ACCESS) 

134 

135 self.assertIsNone(result) 

136 

137 def test_validate_access_rule_invalid_type(self): 

138 

139 rule = copy.copy(fake.IP_ACCESS) 

140 rule['access_type'] = 'user' 

141 

142 self.assertRaises(exception.InvalidShareAccess, 

143 self.helper._validate_access_rule, 

144 rule) 

145 

146 def test_validate_access_rule_invalid_level(self): 

147 

148 rule = copy.copy(fake.IP_ACCESS) 

149 rule['access_level'] = 'none' 

150 

151 self.assertRaises(exception.InvalidShareAccessLevel, 

152 self.helper._validate_access_rule, 

153 rule) 

154 

155 def test_get_target(self): 

156 

157 target = self.helper.get_target(fake.NFS_SHARE) 

158 self.assertEqual(fake.SHARE_ADDRESS_1, target) 

159 

160 def test_get_share_name_for_share(self): 

161 

162 self.mock_client.get_volume_at_junction_path.return_value = ( 

163 fake.VOLUME) 

164 

165 share_name = self.helper.get_share_name_for_share(fake.NFS_SHARE) 

166 

167 self.assertEqual(fake.SHARE_NAME, share_name) 

168 self.mock_client.get_volume_at_junction_path.assert_called_once_with( 

169 fake.NFS_SHARE_PATH) 

170 

171 def test_get_share_name_for_share_not_found(self): 

172 

173 self.mock_client.get_volume_at_junction_path.return_value = None 

174 

175 share_name = self.helper.get_share_name_for_share(fake.NFS_SHARE) 

176 

177 self.assertIsNone(share_name) 

178 self.mock_client.get_volume_at_junction_path.assert_called_once_with( 

179 fake.NFS_SHARE_PATH) 

180 

181 def test_get_target_missing_location(self): 

182 

183 target = self.helper.get_target({'export_location': ''}) 

184 self.assertEqual('', target) 

185 

186 def test_get_export_location(self): 

187 

188 export = fake.NFS_SHARE['export_location'] 

189 self.mock_object(self.helper, '_get_share_export_location', 

190 mock.Mock(return_value=export)) 

191 

192 host_ip, export_path = self.helper._get_export_location( 

193 fake.NFS_SHARE) 

194 self.assertEqual(fake.SHARE_ADDRESS_1, host_ip) 

195 self.assertEqual('/' + fake.SHARE_NAME, export_path) 

196 

197 @ddt.data('', 'invalid') 

198 def test_get_export_location_missing_location_invalid(self, export): 

199 

200 fake_share = fake.NFS_SHARE.copy() 

201 fake_share['export_location'] = export 

202 self.mock_object(self.helper, '_get_share_export_location', 

203 mock.Mock(return_value=export)) 

204 

205 host_ip, export_path = self.helper._get_export_location(fake_share) 

206 

207 self.assertEqual('', host_ip) 

208 self.assertEqual('', export_path) 

209 self.helper._get_share_export_location.assert_called_once_with( 

210 fake_share) 

211 

212 def test_get_temp_export_policy_name(self): 

213 

214 self.mock_object(uuid, 'uuid1', mock.Mock(return_value='fake-uuid')) 

215 

216 result = self.helper._get_temp_export_policy_name() 

217 

218 self.assertEqual('temp_fake_uuid', result) 

219 

220 def test_get_export_policy_name(self): 

221 

222 result = self.helper._get_export_policy_name(fake.NFS_SHARE) 

223 self.assertEqual(fake.EXPORT_POLICY_NAME, result) 

224 

225 def test_ensure_export_policy_equal(self): 

226 

227 self.mock_client.get_nfs_export_policy_for_volume.return_value = ( 

228 fake.EXPORT_POLICY_NAME) 

229 

230 self.helper._ensure_export_policy(fake.NFS_SHARE, fake.SHARE_NAME) 

231 

232 self.assertFalse(self.mock_client.create_nfs_export_policy.called) 

233 self.assertFalse(self.mock_client.rename_nfs_export_policy.called) 

234 

235 def test_ensure_export_policy_default(self): 

236 

237 self.mock_client.get_nfs_export_policy_for_volume.return_value = ( 

238 'default') 

239 

240 self.helper._ensure_export_policy(fake.NFS_SHARE, fake.SHARE_NAME) 

241 

242 self.mock_client.create_nfs_export_policy.assert_called_once_with( 

243 fake.EXPORT_POLICY_NAME) 

244 (self.mock_client.set_nfs_export_policy_for_volume. 

245 assert_called_once_with(fake.SHARE_NAME, fake.EXPORT_POLICY_NAME)) 

246 self.assertFalse(self.mock_client.rename_nfs_export_policy.called) 

247 

248 def test_ensure_export_policy_rename(self): 

249 

250 self.mock_client.get_nfs_export_policy_for_volume.return_value = 'fake' 

251 

252 self.helper._ensure_export_policy(fake.NFS_SHARE, fake.SHARE_NAME) 

253 

254 self.assertFalse(self.mock_client.create_nfs_export_policy.called) 

255 self.mock_client.rename_nfs_export_policy.assert_called_once_with( 

256 'fake', fake.EXPORT_POLICY_NAME) 

257 

258 @ddt.data((False, ['sys']), (True, ['krb5', 'krb5i', 'krb5p'])) 

259 @ddt.unpack 

260 def test__get_security_flavors(self, kerberos_enabled, security_flavors): 

261 self.mock_client.is_kerberos_enabled.return_value = kerberos_enabled 

262 

263 result = self.helper._get_auth_methods() 

264 

265 self.assertEqual(security_flavors, result) 

266 

267 def test_cleanup_demoted_replica(self): 

268 

269 self.mock_object(self.helper, 'delete_share') 

270 self.helper.cleanup_demoted_replica(fake.NFS_SHARE, fake.SHARE_NAME) 

271 

272 self.helper.delete_share.assert_called_once_with(fake.NFS_SHARE, 

273 fake.SHARE_NAME)