Coverage for manila/share/drivers/dell_emc/common/enas/xml_api_parser.py: 97%
191 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2016 Dell Inc. or its subsidiaries.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import io
17import re
19from lxml import etree
22class XMLAPIParser(object):
23 def __init__(self):
24 # The following Boolean acts as the flag for the common sub-element.
25 # For instance:
26 # <CifsServers>
27 # <li> server_1 </li>
28 # </CifsServers>
29 # <Alias>
30 # <li> interface_1 </li>
31 # </Alias>
32 self.is_QueryStatus = False
33 self.is_CifsServers = False
34 self.is_Aliases = False
35 self.is_MoverStatus = False
36 self.is_TaskResponse = False
37 self.is_Vdm = False
38 self.is_Interfaces = False
40 self.elt = {}
42 def _remove_ns(self, tag):
43 i = tag.find('}')
44 if i >= 0: 44 ↛ 46line 44 didn't jump to line 46 because the condition on line 44 was always true
45 tag = tag[i + 1:]
46 return tag
48 def parse(self, xml):
49 result = {
50 'type': None,
51 'taskId': None,
52 'maxSeverity': None,
53 'objects': [],
54 'problems': [],
55 }
57 events = ("start", "end")
59 context = etree.iterparse(io.BytesIO(xml),
60 events=events)
61 for action, elem in context:
62 self.tag = self._remove_ns(elem.tag)
64 func = self._get_func(action, self.tag)
65 if func in vars(XMLAPIParser):
66 if action == 'start':
67 eval('self.' + func)(elem, result) # nosec B307
68 elif action == 'end': 68 ↛ 61line 68 didn't jump to line 61 because the condition on line 68 was always true
69 eval('self.' + func)() # nosec B307
71 return result
73 def _get_func(self, action, tag):
74 if tag == 'W2KServerData':
75 return action + '_' + 'w2k_server_data'
77 temp_list = re.sub(r"([A-Z])", r" \1", tag).split()
78 if temp_list: 78 ↛ 81line 78 didn't jump to line 81 because the condition on line 78 was always true
79 func_name = action + '_' + '_'.join(temp_list)
80 else:
81 func_name = action + '_' + tag
82 return func_name.lower()
84 def _copy_property(self, source, target, property, list_property=None):
85 for key in property:
86 if key in source:
87 target[key] = source[key]
89 if list_property:
90 for key in list_property:
91 if key in source:
92 target[key] = source[key].split()
94 def _append_elm_property(self, elm, result, property, identifier):
95 for obj in result['objects']:
96 if (identifier in obj and identifier in elm.attrib and 96 ↛ 95line 96 didn't jump to line 95 because the condition on line 96 was always true
97 elm.attrib[identifier] == obj[identifier]):
98 for key, value in elm.attrib.items():
99 if key in property:
100 obj[key] = value
102 def _append_element(self, elm, result, property, list_property,
103 identifier):
104 sub_elm = {}
105 self._copy_property(elm.attrib, sub_elm, property, list_property)
107 for obj in result['objects']:
108 if (identifier in obj and identifier in elm.attrib and 108 ↛ 107line 108 didn't jump to line 107 because the condition on line 108 was always true
109 elm.attrib[identifier] == obj[identifier]):
110 if self.tag in obj:
111 obj[self.tag].append(sub_elm)
112 else:
113 obj[self.tag] = [sub_elm]
115 def start_task_response(self, elm, result):
116 self.is_TaskResponse = True
117 result['type'] = 'TaskResponse'
118 self._copy_property(elm.attrib, result, ['taskId'])
120 def end_task_response(self):
121 self.is_TaskResponse = False
123 def start_fault(self, elm, result):
124 result['type'] = 'Fault'
126 def start_status(self, elm, result):
127 if self.is_TaskResponse:
128 result['maxSeverity'] = elm.attrib['maxSeverity']
129 elif self.is_MoverStatus or self.is_Vdm:
130 self.elt['maxSeverity'] = elm.attrib['maxSeverity']
132 def start_query_status(self, elm, result):
133 self.is_QueryStatus = True
134 result['type'] = 'QueryStatus'
135 self._copy_property(elm.attrib, result, ['maxSeverity'])
137 def end_query_status(self):
138 self.is_QueryStatus = False
140 def start_problem(self, elm, result):
141 self.elt = {}
142 properties = ('message', 'messageCode')
144 self._copy_property(elm.attrib, self.elt, properties)
145 result['problems'].append(self.elt)
147 def start_description(self, elm, result):
148 self.elt['Description'] = elm.text
150 def start_action(self, elm, result):
151 self.elt['Action'] = elm.text
153 def start_diagnostics(self, elm, result):
154 self.elt['Diagnostics'] = elm.text
156 def start_file_system(self, elm, result):
157 self.elt = {}
158 property = (
159 'fileSystem',
160 'name',
161 'type',
162 'storages',
163 'volume',
164 'dataServicePolicies',
165 'internalUse',
166 )
167 list_property = ('storagePools',)
169 self._copy_property(elm.attrib, self.elt, property, list_property)
170 result['objects'].append(self.elt)
172 def start_file_system_capacity_info(self, elm, result):
173 property = ('volumeSize',)
175 identifier = 'fileSystem'
177 self._append_elm_property(elm, result, property, identifier)
179 def start_storage_pool(self, elm, result):
180 self.elt = {}
181 property = ('name', 'autoSize', 'usedSize', 'diskType', 'pool',
182 'dataServicePolicies', 'virtualProvisioning')
183 list_property = ('movers',)
185 self._copy_property(elm.attrib, self.elt, property, list_property)
186 result['objects'].append(self.elt)
188 def start_system_storage_pool_data(self, elm, result):
189 property = ('greedy', 'isBackendPool')
191 self._copy_property(elm.attrib, self.elt, property)
193 def start_mover(self, elm, result):
194 self.elt = {}
195 property = ('name', 'host', 'mover', 'role')
196 list_property = ('ntpServers', 'standbyFors', 'standbys')
198 self._copy_property(elm.attrib, self.elt, property, list_property)
199 result['objects'].append(self.elt)
201 def start_mover_status(self, elm, result):
202 self.is_MoverStatus = True
204 property = ('version', 'csTime', 'clock', 'timezone', 'uptime')
206 identifier = 'mover'
208 self._append_elm_property(elm, result, property, identifier)
210 def end_mover_status(self):
211 self.is_MoverStatus = False
213 def start_mover_dns_domain(self, elm, result):
214 property = ('name', 'protocol')
215 list_property = ('servers',)
217 identifier = 'mover'
219 self._append_element(elm, result, property, list_property, identifier)
221 def start_mover_interface(self, elm, result):
222 property = (
223 'name',
224 'device',
225 'up',
226 'ipVersion',
227 'netMask',
228 'ipAddress',
229 'vlanid',
230 )
232 identifier = 'mover'
234 self._append_element(elm, result, property, None, identifier)
236 def start_logical_network_device(self, elm, result):
237 property = ('name', 'type', 'speed')
238 list_property = ('interfaces',)
239 identifier = 'mover'
241 self._append_element(elm, result, property, list_property, identifier)
243 def start_vdm(self, elm, result):
244 self.is_Vdm = True
246 self.elt = {}
247 property = ('name', 'state', 'mover', 'vdm')
249 self._copy_property(elm.attrib, self.elt, property)
250 result['objects'].append(self.elt)
252 def end_vdm(self):
253 self.is_Vdm = False
255 def start_interfaces(self, elm, result):
256 self.is_Interfaces = True
257 self.elt['Interfaces'] = []
259 def end_interfaces(self):
260 self.is_Interfaces = False
262 def start_li(self, elm, result):
263 if self.is_CifsServers:
264 self.elt['CifsServers'].append(elm.text)
265 elif self.is_Aliases:
266 self.elt['Aliases'].append(elm.text)
267 elif self.is_Interfaces: 267 ↛ exitline 267 didn't return from function 'start_li' because the condition on line 267 was always true
268 self.elt['Interfaces'].append(elm.text)
270 def start_cifs_server(self, elm, result):
271 self.elt = {}
272 property = ('type', 'localUsers', 'name', 'mover', 'moverIdIsVdm')
274 list_property = ('interfaces',)
276 self._copy_property(elm.attrib, self.elt, property, list_property)
277 result['objects'].append(self.elt)
279 def start_aliases(self, elm, result):
280 self.is_Aliases = True
281 self.elt['Aliases'] = []
283 def end_aliases(self):
284 self.is_Aliases = False
286 def start_w2k_server_data(self, elm, result):
287 property = ('domain', 'compName', 'domainJoined')
289 self._copy_property(elm.attrib, self.elt, property)
291 def start_cifs_share(self, elm, result):
292 self.elt = {}
293 property = ('path', 'fileSystem', 'name', 'mover', 'moverIdIsVdm')
295 self._copy_property(elm.attrib, self.elt, property)
296 result['objects'].append(self.elt)
298 def start_cifs_servers(self, elm, result):
299 self.is_CifsServers = True
300 self.elt['CifsServers'] = []
302 def end_cifs_servers(self):
303 self.is_CifsServers = False
305 def start_checkpoint(self, elm, result):
306 self.elt = {}
307 property = ('checkpointOf', 'name', 'checkpoint', 'state')
309 self._copy_property(elm.attrib, self.elt, property)
310 result['objects'].append(self.elt)
312 def start_mount(self, elm, result):
313 self.elt = {}
314 property = ('fileSystem', 'path', 'mover', 'moverIdIsVdm')
316 self._copy_property(elm.attrib, self.elt, property)
317 result['objects'].append(self.elt)