Coverage for manila/share/drivers/dell_emc/common/enas/xml_api_parser.py: 97%

191 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2016 Dell Inc. or its subsidiaries. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16import io 

17import re 

18 

19from lxml import etree 

20 

21 

22class XMLAPIParser(object): 

23 def __init__(self): 

24 # The following Boolean acts as the flag for the common sub-element. 

25 # For instance: 

26 # <CifsServers> 

27 # <li> server_1 </li> 

28 # </CifsServers> 

29 # <Alias> 

30 # <li> interface_1 </li> 

31 # </Alias> 

32 self.is_QueryStatus = False 

33 self.is_CifsServers = False 

34 self.is_Aliases = False 

35 self.is_MoverStatus = False 

36 self.is_TaskResponse = False 

37 self.is_Vdm = False 

38 self.is_Interfaces = False 

39 

40 self.elt = {} 

41 

42 def _remove_ns(self, tag): 

43 i = tag.find('}') 

44 if i >= 0: 44 ↛ 46line 44 didn't jump to line 46 because the condition on line 44 was always true

45 tag = tag[i + 1:] 

46 return tag 

47 

48 def parse(self, xml): 

49 result = { 

50 'type': None, 

51 'taskId': None, 

52 'maxSeverity': None, 

53 'objects': [], 

54 'problems': [], 

55 } 

56 

57 events = ("start", "end") 

58 

59 context = etree.iterparse(io.BytesIO(xml), 

60 events=events) 

61 for action, elem in context: 

62 self.tag = self._remove_ns(elem.tag) 

63 

64 func = self._get_func(action, self.tag) 

65 if func in vars(XMLAPIParser): 

66 if action == 'start': 

67 eval('self.' + func)(elem, result) # nosec B307 

68 elif action == 'end': 68 ↛ 61line 68 didn't jump to line 61 because the condition on line 68 was always true

69 eval('self.' + func)() # nosec B307 

70 

71 return result 

72 

73 def _get_func(self, action, tag): 

74 if tag == 'W2KServerData': 

75 return action + '_' + 'w2k_server_data' 

76 

77 temp_list = re.sub(r"([A-Z])", r" \1", tag).split() 

78 if temp_list: 78 ↛ 81line 78 didn't jump to line 81 because the condition on line 78 was always true

79 func_name = action + '_' + '_'.join(temp_list) 

80 else: 

81 func_name = action + '_' + tag 

82 return func_name.lower() 

83 

84 def _copy_property(self, source, target, property, list_property=None): 

85 for key in property: 

86 if key in source: 

87 target[key] = source[key] 

88 

89 if list_property: 

90 for key in list_property: 

91 if key in source: 

92 target[key] = source[key].split() 

93 

94 def _append_elm_property(self, elm, result, property, identifier): 

95 for obj in result['objects']: 

96 if (identifier in obj and identifier in elm.attrib and 96 ↛ 95line 96 didn't jump to line 95 because the condition on line 96 was always true

97 elm.attrib[identifier] == obj[identifier]): 

98 for key, value in elm.attrib.items(): 

99 if key in property: 

100 obj[key] = value 

101 

102 def _append_element(self, elm, result, property, list_property, 

103 identifier): 

104 sub_elm = {} 

105 self._copy_property(elm.attrib, sub_elm, property, list_property) 

106 

107 for obj in result['objects']: 

108 if (identifier in obj and identifier in elm.attrib and 108 ↛ 107line 108 didn't jump to line 107 because the condition on line 108 was always true

109 elm.attrib[identifier] == obj[identifier]): 

110 if self.tag in obj: 

111 obj[self.tag].append(sub_elm) 

112 else: 

113 obj[self.tag] = [sub_elm] 

114 

115 def start_task_response(self, elm, result): 

116 self.is_TaskResponse = True 

117 result['type'] = 'TaskResponse' 

118 self._copy_property(elm.attrib, result, ['taskId']) 

119 

120 def end_task_response(self): 

121 self.is_TaskResponse = False 

122 

123 def start_fault(self, elm, result): 

124 result['type'] = 'Fault' 

125 

126 def start_status(self, elm, result): 

127 if self.is_TaskResponse: 

128 result['maxSeverity'] = elm.attrib['maxSeverity'] 

129 elif self.is_MoverStatus or self.is_Vdm: 

130 self.elt['maxSeverity'] = elm.attrib['maxSeverity'] 

131 

132 def start_query_status(self, elm, result): 

133 self.is_QueryStatus = True 

134 result['type'] = 'QueryStatus' 

135 self._copy_property(elm.attrib, result, ['maxSeverity']) 

136 

137 def end_query_status(self): 

138 self.is_QueryStatus = False 

139 

140 def start_problem(self, elm, result): 

141 self.elt = {} 

142 properties = ('message', 'messageCode') 

143 

144 self._copy_property(elm.attrib, self.elt, properties) 

145 result['problems'].append(self.elt) 

146 

147 def start_description(self, elm, result): 

148 self.elt['Description'] = elm.text 

149 

150 def start_action(self, elm, result): 

151 self.elt['Action'] = elm.text 

152 

153 def start_diagnostics(self, elm, result): 

154 self.elt['Diagnostics'] = elm.text 

155 

156 def start_file_system(self, elm, result): 

157 self.elt = {} 

158 property = ( 

159 'fileSystem', 

160 'name', 

161 'type', 

162 'storages', 

163 'volume', 

164 'dataServicePolicies', 

165 'internalUse', 

166 ) 

167 list_property = ('storagePools',) 

168 

169 self._copy_property(elm.attrib, self.elt, property, list_property) 

170 result['objects'].append(self.elt) 

171 

172 def start_file_system_capacity_info(self, elm, result): 

173 property = ('volumeSize',) 

174 

175 identifier = 'fileSystem' 

176 

177 self._append_elm_property(elm, result, property, identifier) 

178 

179 def start_storage_pool(self, elm, result): 

180 self.elt = {} 

181 property = ('name', 'autoSize', 'usedSize', 'diskType', 'pool', 

182 'dataServicePolicies', 'virtualProvisioning') 

183 list_property = ('movers',) 

184 

185 self._copy_property(elm.attrib, self.elt, property, list_property) 

186 result['objects'].append(self.elt) 

187 

188 def start_system_storage_pool_data(self, elm, result): 

189 property = ('greedy', 'isBackendPool') 

190 

191 self._copy_property(elm.attrib, self.elt, property) 

192 

193 def start_mover(self, elm, result): 

194 self.elt = {} 

195 property = ('name', 'host', 'mover', 'role') 

196 list_property = ('ntpServers', 'standbyFors', 'standbys') 

197 

198 self._copy_property(elm.attrib, self.elt, property, list_property) 

199 result['objects'].append(self.elt) 

200 

201 def start_mover_status(self, elm, result): 

202 self.is_MoverStatus = True 

203 

204 property = ('version', 'csTime', 'clock', 'timezone', 'uptime') 

205 

206 identifier = 'mover' 

207 

208 self._append_elm_property(elm, result, property, identifier) 

209 

210 def end_mover_status(self): 

211 self.is_MoverStatus = False 

212 

213 def start_mover_dns_domain(self, elm, result): 

214 property = ('name', 'protocol') 

215 list_property = ('servers',) 

216 

217 identifier = 'mover' 

218 

219 self._append_element(elm, result, property, list_property, identifier) 

220 

221 def start_mover_interface(self, elm, result): 

222 property = ( 

223 'name', 

224 'device', 

225 'up', 

226 'ipVersion', 

227 'netMask', 

228 'ipAddress', 

229 'vlanid', 

230 ) 

231 

232 identifier = 'mover' 

233 

234 self._append_element(elm, result, property, None, identifier) 

235 

236 def start_logical_network_device(self, elm, result): 

237 property = ('name', 'type', 'speed') 

238 list_property = ('interfaces',) 

239 identifier = 'mover' 

240 

241 self._append_element(elm, result, property, list_property, identifier) 

242 

243 def start_vdm(self, elm, result): 

244 self.is_Vdm = True 

245 

246 self.elt = {} 

247 property = ('name', 'state', 'mover', 'vdm') 

248 

249 self._copy_property(elm.attrib, self.elt, property) 

250 result['objects'].append(self.elt) 

251 

252 def end_vdm(self): 

253 self.is_Vdm = False 

254 

255 def start_interfaces(self, elm, result): 

256 self.is_Interfaces = True 

257 self.elt['Interfaces'] = [] 

258 

259 def end_interfaces(self): 

260 self.is_Interfaces = False 

261 

262 def start_li(self, elm, result): 

263 if self.is_CifsServers: 

264 self.elt['CifsServers'].append(elm.text) 

265 elif self.is_Aliases: 

266 self.elt['Aliases'].append(elm.text) 

267 elif self.is_Interfaces: 267 ↛ exitline 267 didn't return from function 'start_li' because the condition on line 267 was always true

268 self.elt['Interfaces'].append(elm.text) 

269 

270 def start_cifs_server(self, elm, result): 

271 self.elt = {} 

272 property = ('type', 'localUsers', 'name', 'mover', 'moverIdIsVdm') 

273 

274 list_property = ('interfaces',) 

275 

276 self._copy_property(elm.attrib, self.elt, property, list_property) 

277 result['objects'].append(self.elt) 

278 

279 def start_aliases(self, elm, result): 

280 self.is_Aliases = True 

281 self.elt['Aliases'] = [] 

282 

283 def end_aliases(self): 

284 self.is_Aliases = False 

285 

286 def start_w2k_server_data(self, elm, result): 

287 property = ('domain', 'compName', 'domainJoined') 

288 

289 self._copy_property(elm.attrib, self.elt, property) 

290 

291 def start_cifs_share(self, elm, result): 

292 self.elt = {} 

293 property = ('path', 'fileSystem', 'name', 'mover', 'moverIdIsVdm') 

294 

295 self._copy_property(elm.attrib, self.elt, property) 

296 result['objects'].append(self.elt) 

297 

298 def start_cifs_servers(self, elm, result): 

299 self.is_CifsServers = True 

300 self.elt['CifsServers'] = [] 

301 

302 def end_cifs_servers(self): 

303 self.is_CifsServers = False 

304 

305 def start_checkpoint(self, elm, result): 

306 self.elt = {} 

307 property = ('checkpointOf', 'name', 'checkpoint', 'state') 

308 

309 self._copy_property(elm.attrib, self.elt, property) 

310 result['objects'].append(self.elt) 

311 

312 def start_mount(self, elm, result): 

313 self.elt = {} 

314 property = ('fileSystem', 'path', 'mover', 'moverIdIsVdm') 

315 

316 self._copy_property(elm.attrib, self.elt, property) 

317 result['objects'].append(self.elt)