Coverage for manila/tests/share/drivers/dell_emc/plugins/powerflex/test_object_manager.py: 100%
262 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2023 Dell Inc. or its subsidiaries.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16from http import client as http_client
17import json
18import os
20import ddt
21import requests_mock
23from manila import exception
24from manila.share.drivers.dell_emc.plugins.powerflex import (
25 object_manager as manager
26)
27from manila import test
30@ddt.ddt
31class StorageObjectManagerTestCase(test.TestCase):
32 def setUp(self):
33 super(StorageObjectManagerTestCase, self).setUp()
35 self._mock_url = "https://192.168.0.110:443"
36 self.manager = manager.StorageObjectManager(
37 self._mock_url, username="admin", password="pwd", export_path=None
38 )
39 self.mockup_file_base = os.path.join(
40 os.path.dirname(os.path.realpath(__file__)), 'mockup')
42 @ddt.data(False, True)
43 def test__get_headers(self, got_token):
44 self.manager.got_token = got_token
45 self.manager.rest_token = "token_str"
46 self.assertEqual(
47 self.manager._get_headers().get("Authorization") is not None,
48 got_token,
49 )
51 def _getJsonFile(self, filename):
52 f = open(os.path.join(self.mockup_file_base, filename))
53 data = json.load(f)
54 f.close()
55 return data
57 @requests_mock.mock()
58 def test_get_nas_server_id(self, m):
59 nas_server = "env8nasserver"
60 self.assertEqual(0, len(m.request_history))
61 self._add_get_nas_server_id_response(
62 m, nas_server, self._getJsonFile("get_nas_server_id_response.json")
63 )
64 id = self.manager.get_nas_server_id(nas_server)
65 self.assertEqual(id, "64132f37-d33e-9d4a-89ba-d625520a4779")
67 def _add_get_nas_server_id_response(self, m, nas_server, json_str):
68 url = "{0}/rest/v1/nas-servers?select=id&name=eq.{1}".format(
69 self._mock_url, nas_server
70 )
71 m.get(url, status_code=200, json=json_str)
73 @requests_mock.mock()
74 def test_create_filesystem(self, m):
75 nas_server = "env8nasserver"
76 self.assertEqual(0, len(m.request_history))
77 self._add_get_nas_server_id_response(
78 m, nas_server, self._getJsonFile("get_nas_server_id_response.json")
79 )
80 storage_pool_id = "8515fee00000000"
81 self._add_create_filesystem_response(
82 m, self._getJsonFile("create_filesystem_response.json")
83 )
84 id = self.manager.create_filesystem(
85 storage_pool_id,
86 nas_server,
87 name="Manila-filesystem",
88 size=3221225472,
89 )
90 self.assertEqual(id, "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3")
92 def _add_create_filesystem_response(self, m, json_str):
93 url = "{0}/rest/v1/file-systems".format(self._mock_url)
94 m.post(url, status_code=201, json=json_str)
96 @requests_mock.mock()
97 def test_create_nfs_export(self, m):
98 filesystem_id = "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3"
99 name = "Manila-UT-filesystem"
100 self.assertEqual(0, len(m.request_history))
101 self._add_create_nfs_export_response(
102 m, self._getJsonFile("create_nfs_export_response.json")
103 )
104 id = self.manager.create_nfs_export(filesystem_id, name)
105 self.assertEqual(id, "6433a2b2-6d60-f737-9f3b-2a50fb1ccff3")
107 def _add_create_nfs_export_response(self, m, json_str):
108 url = "{0}/rest/v1/nfs-exports".format(self._mock_url)
109 m.post(url, status_code=201, json=json_str)
111 @requests_mock.mock()
112 def test_delete_filesystem(self, m):
113 filesystem_id = "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3"
114 self.assertEqual(0, len(m.request_history))
115 self._add_delete_filesystem_response(m, filesystem_id)
116 result = self.manager.delete_filesystem(filesystem_id)
117 self.assertEqual(result, True)
119 def _add_delete_filesystem_response(self, m, filesystem_id):
120 url = "{0}/rest/v1/file-systems/{1}".format(
121 self._mock_url, filesystem_id
122 )
123 m.delete(url, status_code=204)
125 @requests_mock.mock()
126 def test_create_snapshot(self, m):
127 name = "Manila-UT-filesystem-snap"
128 filesystem_id = "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3"
129 self.assertEqual(0, len(m.request_history))
130 self._add_create_snapshot_response(
131 m,
132 filesystem_id,
133 self._getJsonFile("create_nfs_snapshot_response.json"),
134 )
135 result = self.manager.create_snapshot(name, filesystem_id)
136 self.assertEqual(result, True)
138 def _add_create_snapshot_response(self, m, filesystem_id, json_str):
139 url = "{0}/rest/v1/file-systems/{1}/snapshot".format(
140 self._mock_url, filesystem_id
141 )
142 m.post(url, status_code=201, json=json_str)
144 @requests_mock.mock()
145 def test_get_nfs_export_name(self, m):
146 export_id = "6433a2b2-6d60-f737-9f3b-2a50fb1ccff3"
147 self.assertEqual(0, len(m.request_history))
148 self._add_get_nfs_export_name_response(
149 m,
150 export_id,
151 self._getJsonFile("get_nfs_export_name_response.json"),
152 )
153 name = self.manager.get_nfs_export_name(export_id)
154 self.assertEqual(name, "Manila-UT-filesystem")
156 def _add_get_nfs_export_name_response(self, m, export_id, json_str):
157 url = "{0}/rest/v1/nfs-exports/{1}?select=*".format(
158 self._mock_url, export_id
159 )
160 m.get(url, status_code=200, json=json_str)
162 @requests_mock.mock()
163 def test_get_filesystem_id(self, m):
164 name = "Manila-UT-filesystem"
165 self.assertEqual(0, len(m.request_history))
166 self._add_get_filesystem_id_response(
167 m, name, self._getJsonFile("get_fileystem_id_response.json")
168 )
169 id = self.manager.get_filesystem_id(name)
170 self.assertEqual(id, "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3")
172 def _add_get_filesystem_id_response(self, m, name, json_str):
173 url = "{0}/rest/v1/file-systems?select=id&name=eq.{1}".format(
174 self._mock_url, name
175 )
176 m.get(url, status_code=200, json=json_str)
178 @requests_mock.mock()
179 def test_get_nfs_export_id(self, m):
180 name = "Manila-UT-filesystem"
181 self.assertEqual(0, len(m.request_history))
182 self._add_get_nfs_export_id_response(
183 m, name, self._getJsonFile("get_nfs_export_id_response.json")
184 )
185 id = self.manager.get_nfs_export_id(name)
186 self.assertEqual(id, "6433a2b2-6d60-f737-9f3b-2a50fb1ccff3")
188 def _add_get_nfs_export_id_response(self, m, name, json_str):
189 url = "{0}/rest/v1/nfs-exports?select=id&name=eq.{1}".format(
190 self._mock_url, name
191 )
192 m.get(url, status_code=200, json=json_str)
194 @requests_mock.mock()
195 def test_get_storage_pool_id(self, m):
196 protection_domain_name = "Env8-PD-1"
197 storage_pool_name = "Env8-SP-SW_SSD-1"
198 self.assertEqual(0, len(m.request_history))
199 self._add_get_storage_pool_id_response(
200 m, self._getJsonFile("get_storage_pool_id_response.json")
201 )
202 id = self.manager.get_storage_pool_id(
203 protection_domain_name, storage_pool_name
204 )
205 self.assertEqual(id, "28515fee00000000")
207 def _add_get_storage_pool_id_response(self, m, json_str):
208 url = "{0}/api/types/StoragePool/instances/action/queryIdByKey".format(
209 self._mock_url
210 )
211 m.post(url, status_code=200, json=json_str)
213 @requests_mock.mock()
214 def test_set_export_access(self, m):
215 export_id = "6433a2b2-6d60-f737-9f3b-2a50fb1ccff3"
216 rw_hosts = "192.168.1.110"
217 ro_hosts = "192.168.1.111"
218 self.assertEqual(0, len(m.request_history))
219 self._add_set_export_access_response(m, export_id)
220 result = self.manager.set_export_access(export_id, rw_hosts, ro_hosts)
221 self.assertEqual(result, True)
223 def _add_set_export_access_response(self, m, export_id):
224 url = "{0}/rest/v1/nfs-exports/{1}".format(self._mock_url, export_id)
225 m.patch(url, status_code=204)
227 @requests_mock.mock()
228 def test_extend_export(self, m):
229 filesystem_id = "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3"
230 new_size = 6441225472
231 self.assertEqual(0, len(m.request_history))
232 self._add_extend_export_response(m, filesystem_id)
233 result = self.manager.extend_export(filesystem_id, new_size)
234 self.assertEqual(result, True)
236 def _add_extend_export_response(self, m, filesystem_id):
237 url = "{0}/rest/v1/file-systems/{1}".format(
238 self._mock_url, filesystem_id
239 )
240 m.patch(url, status_code=204)
242 @requests_mock.mock()
243 def test_get_fsid_from_export_name(self, m):
244 name = "Manila-UT-filesystem"
245 self.assertEqual(0, len(m.request_history))
246 self._add_get_fsid_from_export_name_response(
247 m,
248 name,
249 self._getJsonFile("get_fsid_from_export_name_response.json"),
250 )
251 id = self.manager.get_fsid_from_export_name(name)
252 self.assertEqual(id, "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3")
254 def _add_get_fsid_from_export_name_response(self, m, name, json_str):
255 url = (
256 "{0}/rest/v1/nfs-exports?select=file_system_id&name=eq.{1}".format(
257 self._mock_url, name
258 )
259 )
260 m.get(url, status_code=200, json=json_str)
262 @requests_mock.mock()
263 def test_get_fsid_from_snapshot_name(self, m):
264 snapshot_name = "Manila-UT-filesystem-snap"
265 self.assertEqual(0, len(m.request_history))
266 self._add_get_fsid_from_snapshot_name_response(
267 m,
268 snapshot_name,
269 self._getJsonFile("get_fsid_from_snapshot_name_response.json"),
270 )
271 id = self.manager.get_fsid_from_snapshot_name(snapshot_name)
272 self.assertEqual(id, "6433b635-6c1f-878e-6467-2a50fb1ccff3")
274 def _add_get_fsid_from_snapshot_name_response(
275 self, m, snapshot_name, json_str
276 ):
277 url = "{0}/rest/v1/file-systems?select=id&name=eq.{1}".format(
278 self._mock_url, snapshot_name
279 )
280 m.get(url, status_code=200, json=json_str)
282 @requests_mock.mock()
283 def test_check_response_with_login_get(self, m):
284 nas_server = "env8nasserver"
285 self.assertEqual(0, len(m.request_history))
286 self._add_get_nas_server_id_response_list(m, nas_server)
287 self._add_login_success_response(m)
288 id = self.manager.get_nas_server_id(nas_server)
289 self.assertEqual(id, "64132f37-d33e-9d4a-89ba-d625520a4779")
291 def _add_get_nas_server_id_response_list(self, m, nas_server):
292 url = "{0}/rest/v1/nas-servers?select=id&name=eq.{1}".format(
293 self._mock_url, nas_server
294 )
295 m.get(
296 url,
297 [
298 {"status_code": http_client.UNAUTHORIZED},
299 {
300 "status_code": 200,
301 "json": self._getJsonFile(
302 "get_nas_server_id_response.json"
303 ),
304 },
305 ],
306 )
308 def _add_login_success_response(self, m):
309 url = "{0}/rest/auth/login".format(self._mock_url)
310 m.post(
311 url, status_code=200, json=self._getJsonFile("login_response.json")
312 )
314 @requests_mock.mock()
315 def test_check_response_with_login_post(self, m):
316 filesystem_id = "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3"
317 name = "Manila-UT-filesystem"
318 self.assertEqual(0, len(m.request_history))
319 self._add_create_nfs_export_response_list(m)
320 self._add_login_success_response(m)
321 id = self.manager.create_nfs_export(filesystem_id, name)
322 self.assertEqual(id, "6433a2b2-6d60-f737-9f3b-2a50fb1ccff3")
324 def _add_create_nfs_export_response_list(self, m):
325 url = "{0}/rest/v1/nfs-exports".format(self._mock_url)
326 m.post(
327 url,
328 [
329 {"status_code": http_client.UNAUTHORIZED},
330 {
331 "status_code": 201,
332 "json": self._getJsonFile(
333 "create_nfs_export_response.json"
334 ),
335 },
336 ],
337 )
339 @requests_mock.mock()
340 def test_check_response_with_login_delete(self, m):
341 filesystem_id = "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3"
342 self.assertEqual(0, len(m.request_history))
343 self._add_delete_filesystem_response_list(m, filesystem_id)
344 self._add_login_success_response(m)
345 result = self.manager.delete_filesystem(filesystem_id)
346 self.assertEqual(result, True)
348 def _add_delete_filesystem_response_list(self, m, filesystem_id):
349 url = "{0}/rest/v1/file-systems/{1}".format(
350 self._mock_url, filesystem_id
351 )
352 m.delete(
353 url,
354 [{"status_code": http_client.UNAUTHORIZED}, {"status_code": 204}],
355 )
357 @requests_mock.mock()
358 def test_check_response_with_login_patch(self, m):
359 filesystem_id = "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3"
360 new_size = 6441225472
361 self.assertEqual(0, len(m.request_history))
362 self._add_extend_export_response_list(m, filesystem_id)
363 self._add_login_success_response(m)
364 result = self.manager.extend_export(filesystem_id, new_size)
365 self.assertEqual(result, True)
367 def _add_extend_export_response_list(self, m, filesystem_id):
368 url = "{0}/rest/v1/file-systems/{1}".format(
369 self._mock_url, filesystem_id
370 )
371 m.patch(
372 url,
373 [{"status_code": http_client.UNAUTHORIZED}, {"status_code": 204}],
374 )
376 @requests_mock.mock()
377 def test_check_response_with_invalid_credential(self, m):
378 nas_server = "env8nasserver"
379 self.assertEqual(0, len(m.request_history))
380 self._add_get_nas_server_id_unauthorized_response(m, nas_server)
381 self._add_login_fail_response(m)
382 self.assertRaises(
383 exception.NotAuthorized, self.manager.get_nas_server_id, nas_server
384 )
386 def _add_get_nas_server_id_unauthorized_response(self, m, nas_server):
387 url = "{0}/rest/v1/nas-servers?select=id&name=eq.{1}".format(
388 self._mock_url, nas_server
389 )
390 m.get(url, status_code=http_client.UNAUTHORIZED)
392 def _add_login_fail_response(self, m):
393 url = "{0}/rest/auth/login".format(self._mock_url)
394 m.post(url, status_code=http_client.UNAUTHORIZED)
396 @requests_mock.mock()
397 def test_execute_powerflex_post_request_with_no_param(self, m):
398 url = self._mock_url + "/fake_url"
399 self.assertEqual(0, len(m.request_history))
400 m.post(url, status_code=201)
401 res, response = self.manager.execute_powerflex_post_request(url)
402 self.assertEqual(res.status_code, 201)
404 @requests_mock.mock()
405 def test_execute_powerflex_patch_request_with_no_param(self, m):
406 url = self._mock_url + "/fake_url"
407 self.assertEqual(0, len(m.request_history))
408 m.patch(url, status_code=204)
409 res = self.manager.execute_powerflex_patch_request(url)
410 self.assertEqual(res.status_code, 204)
412 @requests_mock.mock()
413 def test_get_storage_pool_spare_percentage(self, m):
414 storage_pool_id = "28515fee00000000"
415 self.assertEqual(0, len(m.request_history))
416 self._add_get_storage_pool_spare_percentage(
417 m,
418 storage_pool_id,
419 self._getJsonFile("get_storage_pool_spare_percentage.json"),
420 )
421 spare = self.manager.get_storage_pool_spare_percentage(storage_pool_id)
422 self.assertEqual(spare, 34)
424 def _add_get_storage_pool_spare_percentage(self, m, storage_pool_id,
425 json_str):
426 url = (
427 "{0}/api/instances/StoragePool::{1}".format(
428 self._mock_url, storage_pool_id
429 )
430 )
431 m.get(url, status_code=200, json=json_str)
433 @requests_mock.mock()
434 def test_get_storage_pool_statistic(self, m):
435 storage_pool_id = "28515fee00000000"
436 self.assertEqual(0, len(m.request_history))
437 self._add_get_storage_pool_statistic(
438 m,
439 storage_pool_id,
440 self._getJsonFile("get_storage_pool_statistic.json"),
441 )
442 statistic = self.manager.get_storage_pool_statistic(storage_pool_id)
443 self.assertEqual(statistic['maxCapacityInKb'], 4826330112)
444 self.assertEqual(statistic['capacityInUseInKb'], 53217280)
445 self.assertEqual(statistic['netUnusedCapacityInKb'], 1566080512)
446 self.assertEqual(statistic['primaryVacInKb'], 184549376)
448 def _add_get_storage_pool_statistic(self, m, storage_pool_id,
449 json_str):
450 url = (
451 ("{0}/api/instances/StoragePool::{1}/relationships/" +
452 "Statistics").format(
453 self._mock_url, storage_pool_id
454 )
455 )
456 m.get(url, status_code=200, json=json_str)