Coverage for manila/tests/share/drivers/dell_emc/plugins/unity/test_client.py: 96%

171 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2016 EMC Corporation. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16from unittest import mock 

17 

18import ddt 

19from oslo_utils import units 

20 

21from manila import exception 

22from manila import test 

23from manila.tests.share.drivers.dell_emc.plugins.unity import fake_exceptions 

24from manila.tests.share.drivers.dell_emc.plugins.unity import res_mock 

25 

26 

27@ddt.ddt 

28class TestClient(test.TestCase): 

29 @res_mock.mock_client_input 

30 @res_mock.patch_client 

31 def test_create_cifs_share__existed_expt(self, client, mocked_input): 

32 resource = mocked_input['filesystem'] 

33 share = mocked_input['cifs_share'] 

34 

35 new_share = client.create_cifs_share(resource, share.name) 

36 

37 self.assertEqual(share.name, new_share.name) 

38 

39 @res_mock.mock_client_input 

40 @res_mock.patch_client 

41 def test_create_nfs_share__existed_expt(self, client, mocked_input): 

42 resource = mocked_input['filesystem'] 

43 share = mocked_input['nfs_share'] 

44 new_share = client.create_nfs_share(resource, share.name) 

45 

46 self.assertEqual(share.name, new_share.name) 

47 

48 @res_mock.mock_client_input 

49 @res_mock.patch_client 

50 def test_create_nfs_filesystem_and_share(self, client, mocked_input): 

51 pool = mocked_input['pool'] 

52 nas_server = mocked_input['nas_server'] 

53 share = mocked_input['nfs_share'] 

54 

55 client.create_nfs_filesystem_and_share( 

56 pool, nas_server, share.name, 

57 share.size) 

58 

59 @res_mock.mock_client_input 

60 @res_mock.patch_client 

61 def test_get_share_with_invalid_proto(self, client, mocked_input): 

62 share = mocked_input['share'] 

63 

64 self.assertRaises(exception.BadConfigurationException, 

65 client.get_share, 

66 share.name, 

67 'fake_proto') 

68 

69 @res_mock.mock_client_input 

70 @res_mock.patch_client 

71 def test_create_filesystem__existed_expt(self, client, mocked_input): 

72 pool = mocked_input['pool'] 

73 nas_server = mocked_input['nas_server'] 

74 filesystem = mocked_input['filesystem'] 

75 

76 new_filesystem = client.create_filesystem(pool, 

77 nas_server, 

78 filesystem.name, 

79 filesystem.size, 

80 filesystem.proto) 

81 

82 self.assertEqual(filesystem.name, new_filesystem.name) 

83 

84 @res_mock.mock_client_input 

85 @res_mock.patch_client 

86 def test_delete_filesystem__nonexistent_expt(self, client, mocked_input): 

87 filesystem = mocked_input['filesystem'] 

88 

89 client.delete_filesystem(filesystem) 

90 

91 @res_mock.mock_client_input 

92 @res_mock.patch_client 

93 def test_create_nas_server__existed_expt(self, client, mocked_input): 

94 sp = mocked_input['sp'] 

95 pool = mocked_input['pool'] 

96 nas_server = mocked_input['nas_server'] 

97 

98 new_nas_server = client.create_nas_server(nas_server.name, sp, pool) 

99 

100 self.assertEqual(nas_server.name, new_nas_server.name) 

101 

102 @res_mock.mock_client_input 

103 @res_mock.patch_client 

104 def test_delete_nas_server__nonexistent_expt(self, client, mocked_input): 

105 nas_server = mocked_input['nas_server'] 

106 

107 client.delete_nas_server(nas_server.name) 

108 

109 @res_mock.mock_client_input 

110 @res_mock.patch_client 

111 def test_create_dns_server__existed_expt(self, client, mocked_input): 

112 nas_server = mocked_input['nas_server'] 

113 

114 client.create_dns_server(nas_server, 'fake_domain', 'fake_dns_ip') 

115 

116 @res_mock.mock_client_input 

117 @res_mock.patch_client 

118 def test_create_interface__existed_expt(self, client, mocked_input): 

119 nas_server = mocked_input['nas_server'] 

120 self.assertRaises(exception.IPAddressInUse, client.create_interface, 

121 nas_server, 'fake_ip_addr', 'fake_mask', 

122 'fake_gateway', port_id='fake_port_id') 

123 

124 @res_mock.mock_client_input 

125 @res_mock.patch_client 

126 def test_enable_cifs_service__existed_expt(self, client, mocked_input): 

127 nas_server = mocked_input['nas_server'] 

128 

129 client.enable_cifs_service( 

130 nas_server, 'domain_name', 'fake_user', 'fake_passwd') 

131 

132 @res_mock.mock_client_input 

133 @res_mock.patch_client 

134 def test_enable_nfs_service__existed_expt(self, client, mocked_input): 

135 nas_server = mocked_input['nas_server'] 

136 

137 client.enable_nfs_service(nas_server) 

138 

139 @res_mock.mock_client_input 

140 @res_mock.patch_client 

141 def test_create_snapshot__existed_expt(self, client, mocked_input): 

142 nas_server = mocked_input['filesystem'] 

143 exp_snap = mocked_input['snapshot'] 

144 

145 client.create_snapshot(nas_server, exp_snap.name) 

146 

147 @res_mock.mock_client_input 

148 @res_mock.patch_client 

149 def test_create_snap_of_snap__existed_expt(self, client, mocked_input): 

150 snapshot = mocked_input['src_snapshot'] 

151 dest_snap = mocked_input['dest_snapshot'] 

152 

153 new_snap = client.create_snap_of_snap(snapshot, dest_snap.name) 

154 

155 self.assertEqual(dest_snap.name, new_snap.name) 

156 

157 @res_mock.mock_client_input 

158 @res_mock.patch_client 

159 def test_delete_snapshot__nonexistent_expt(self, client, mocked_input): 

160 snapshot = mocked_input['snapshot'] 

161 

162 client.delete_snapshot(snapshot) 

163 

164 @res_mock.patch_client 

165 def test_cifs_deny_access__nonexistentuser_expt(self, client): 

166 try: 

167 client.cifs_deny_access('fake_share_name', 'fake_username') 

168 except fake_exceptions.UnityAclUserNotFoundError: 

169 self.fail("UnityAclUserNotFoundError raised unexpectedly!") 

170 

171 @res_mock.patch_client 

172 def test_nfs_deny_access__nonexistent_expt(self, client): 

173 client.nfs_deny_access('fake_share_name', 'fake_ip_addr') 

174 

175 @res_mock.patch_client 

176 def test_get_storage_processor(self, client): 

177 sp = client.get_storage_processor(sp_id='SPA') 

178 

179 self.assertEqual('SPA', sp.name) 

180 

181 @res_mock.mock_client_input 

182 @res_mock.patch_client 

183 def test_extend_filesystem(self, client, mocked_input): 

184 fs = mocked_input['fs'] 

185 

186 size = client.extend_filesystem(fs, 5) 

187 

188 self.assertEqual(5 * units.Gi, size) 

189 

190 @res_mock.mock_client_input 

191 @res_mock.patch_client 

192 def test_shrink_filesystem(self, client, mocked_input): 

193 fs = mocked_input['fs'] 

194 

195 size = client.shrink_filesystem('fake_share_id_1', fs, 4) 

196 

197 self.assertEqual(4 * units.Gi, size) 

198 

199 @res_mock.mock_client_input 

200 @res_mock.patch_client 

201 def test_shrink_filesystem_size_too_small(self, client, mocked_input): 

202 fs = mocked_input['fs'] 

203 

204 self.assertRaises(exception.ShareShrinkingPossibleDataLoss, 

205 client.shrink_filesystem, 'fake_share_id_2', fs, 4) 

206 

207 @res_mock.patch_client 

208 def test_get_file_ports(self, client): 

209 ports = client.get_file_ports() 

210 self.assertEqual(2, len(ports)) 

211 

212 @res_mock.patch_client 

213 def test_get_tenant(self, client): 

214 tenant = client.get_tenant('test', 5) 

215 self.assertEqual('tenant_1', tenant.id) 

216 

217 @res_mock.patch_client 

218 def test_get_tenant_preexist(self, client): 

219 tenant = client.get_tenant('test', 6) 

220 self.assertEqual('tenant_1', tenant.id) 

221 

222 @res_mock.patch_client 

223 def test_get_tenant_name_inuse_but_vlan_not_used(self, client): 

224 self.assertRaises(fake_exceptions.UnityTenantNameInUseError, 

225 client.get_tenant, 'test', 7) 

226 

227 @res_mock.patch_client 

228 def test_get_tenant_for_vlan_0(self, client): 

229 tenant = client.get_tenant('tenant', 0) 

230 self.assertIsNone(tenant) 

231 

232 @res_mock.patch_client 

233 def test_get_tenant_for_vlan_already_has_interfaces(self, client): 

234 tenant = client.get_tenant('tenant', 3) 

235 self.assertEqual('tenant_1', tenant.id) 

236 

237 @res_mock.mock_client_input 

238 @res_mock.patch_client 

239 def test_create_file_interface_ipv6(self, client, mocked_input): 

240 mock_nas_server = mock.Mock() 

241 mock_nas_server.create_file_interface = mock.Mock(return_value=None) 

242 mock_file_interface = mocked_input['file_interface'] 

243 mock_port_id = mock.Mock() 

244 client.create_interface(mock_nas_server, 

245 mock_file_interface.ip_addr, 

246 netmask=None, 

247 gateway=mock_file_interface.gateway, 

248 port_id=mock_port_id, 

249 vlan_id=mock_file_interface.vlan_id, 

250 prefix_length=mock_file_interface.prefix_length 

251 ) 

252 mock_nas_server.create_file_interface.assert_called_once_with( 

253 mock_port_id, 

254 mock_file_interface.ip_addr, 

255 netmask=None, 

256 v6_prefix_length=mock_file_interface.prefix_length, 

257 gateway=mock_file_interface.gateway, 

258 vlan_id=mock_file_interface.vlan_id) 

259 

260 @res_mock.patch_client 

261 def test_get_snapshot(self, client): 

262 snapshot = client.get_snapshot('Snapshot_1') 

263 self.assertEqual('snapshot_1', snapshot.id) 

264 

265 @res_mock.patch_client 

266 def test_restore_snapshot(self, client): 

267 snapshot = client.get_snapshot('Snapshot_1') 

268 rst = client.restore_snapshot(snapshot.name) 

269 self.assertIs(True, rst) 

270 snapshot.restore.assert_called_once_with(delete_backup=True)