Coverage for manila/share/drivers/maprfs/driver_util.py: 93%

238 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2016, MapR Technologies 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15""" 

16Utility for processing MapR cluster operations 

17""" 

18 

19import json 

20import shlex 

21import socket 

22 

23from oslo_concurrency import processutils 

24from oslo_log import log 

25 

26from manila.common import constants 

27from manila import exception 

28from manila.i18n import _ 

29from manila import ssh_utils 

30from manila import utils 

31 

32LOG = log.getLogger(__name__) 

33 

34 

35def get_version_handler(configuration): 

36 # here can be choosing DriverUtils depend on cluster version 

37 return BaseDriverUtil(configuration) 

38 

39 

40class BaseDriverUtil(object): 

41 """Utility class for MapR-FS specific operations.""" 

42 NOT_FOUND_MSG = 'No such' 

43 ERROR_MSG = 'ERROR' 

44 

45 def __init__(self, configuration): 

46 self.configuration = configuration 

47 self.ssh_connections = {} 

48 self.hosts = self.configuration.maprfs_clinode_ip 

49 self.local_hosts = socket.gethostbyname_ex(socket.gethostname())[2] 

50 self.maprcli_bin = '/usr/bin/maprcli' 

51 self.hadoop_bin = '/usr/bin/hadoop' 

52 

53 def _execute(self, *cmd, **kwargs): 

54 for x in range(0, len(self.hosts)): 54 ↛ exitline 54 didn't return from function '_execute' because the loop on line 54 didn't complete

55 try: 

56 check_exit_code = kwargs.pop('check_exit_code', True) 

57 host = self.hosts[x] 

58 if host in self.local_hosts: 

59 cmd = self._as_user(cmd, 

60 self.configuration.maprfs_ssh_name) 

61 out, err = utils.execute(*cmd, 

62 check_exit_code=check_exit_code) 

63 else: 

64 out, err = self._run_ssh(host, cmd, check_exit_code) 

65 # move available cldb host to the beginning 

66 if x > 0: 

67 self.hosts[0], self.hosts[x] = self.hosts[x], self.hosts[0] 

68 return out, err 

69 except exception.ProcessExecutionError as e: 

70 if self._check_error(e): 

71 raise 

72 elif x < len(self.hosts) - 1: 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true

73 msg = ('Error running SSH command. Trying another host') 

74 LOG.error(msg) 

75 else: 

76 raise 

77 except Exception as e: 

78 if x < len(self.hosts) - 1: 

79 msg = ('Error running SSH command. Trying another host') 

80 LOG.error(msg) 

81 else: 

82 raise exception.ProcessExecutionError(str(e)) 

83 

84 def _run_ssh(self, host, cmd_list, check_exit_code=False): 

85 command = ' '.join(shlex.quote(cmd_arg) for cmd_arg in cmd_list) 

86 connection = self.ssh_connections.get(host) 

87 if connection is None: 87 ↛ 107line 87 didn't jump to line 107 because the condition on line 87 was always true

88 ssh_name = self.configuration.maprfs_ssh_name 

89 password = self.configuration.maprfs_ssh_pw 

90 private_key = self.configuration.maprfs_ssh_private_key 

91 remote_ssh_port = self.configuration.maprfs_ssh_port 

92 ssh_conn_timeout = self.configuration.ssh_conn_timeout 

93 min_size = self.configuration.ssh_min_pool_conn 

94 max_size = self.configuration.ssh_max_pool_conn 

95 

96 ssh_pool = ssh_utils.SSHPool(host, 

97 remote_ssh_port, 

98 ssh_conn_timeout, 

99 ssh_name, 

100 password=password, 

101 privatekey=private_key, 

102 min_size=min_size, 

103 max_size=max_size) 

104 ssh = ssh_pool.create() 

105 self.ssh_connections[host] = (ssh_pool, ssh) 

106 else: 

107 ssh_pool, ssh = connection 

108 

109 if not ssh.get_transport().is_active(): 

110 ssh_pool.remove(ssh) 

111 ssh = ssh_pool.create() 

112 self.ssh_connections[host] = (ssh_pool, ssh) 

113 return processutils.ssh_execute( 

114 ssh, 

115 command, 

116 check_exit_code=check_exit_code) 

117 

118 @staticmethod 

119 def _check_error(error): 

120 # check if error was native 

121 return BaseDriverUtil.ERROR_MSG in error.stdout 

122 

123 @staticmethod 

124 def _as_user(cmd, user): 

125 return ['sudo', 'su', '-', user, '-c', 

126 ' '.join(shlex.quote(cmd_arg) for cmd_arg in cmd)] 

127 

128 @staticmethod 

129 def _add_params(cmd, **kwargs): 

130 params = [] 

131 for x in kwargs.keys(): 

132 params.append('-' + x) 

133 params.append(kwargs[x]) 

134 return cmd + params 

135 

136 def create_volume(self, name, path, size, **kwargs): 

137 # delete size param as it is set separately 

138 if kwargs.get('quota'): 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true

139 del kwargs['quota'] 

140 sizestr = str(size) + 'G' 

141 cmd = [self.maprcli_bin, 'volume', 'create', '-name', 

142 name, '-path', path, '-quota', 

143 sizestr, '-readAce', '', '-writeAce', ''] 

144 cmd = self._add_params(cmd, **kwargs) 

145 self._execute(*cmd) 

146 

147 def volume_exists(self, volume_name): 

148 cmd = [self.maprcli_bin, 'volume', 'info', '-name', volume_name] 

149 out, __ = self._execute(*cmd, check_exit_code=False) 

150 return self.NOT_FOUND_MSG not in out 

151 

152 def delete_volume(self, name): 

153 cmd = [self.maprcli_bin, 'volume', 'remove', '-name', name, '-force', 

154 'true'] 

155 out, __ = self._execute(*cmd, check_exit_code=False) 

156 # if volume does not exist do not raise exception.ProcessExecutionError 

157 if self.ERROR_MSG in out and self.NOT_FOUND_MSG not in out: 157 ↛ 158line 157 didn't jump to line 158 because the condition on line 157 was never true

158 raise exception.ProcessExecutionError(out) 

159 

160 def set_volume_size(self, name, size): 

161 sizestr = str(size) + 'G' 

162 cmd = [self.maprcli_bin, 'volume', 'modify', '-name', name, '-quota', 

163 sizestr] 

164 self._execute(*cmd) 

165 

166 def create_snapshot(self, name, volume_name): 

167 cmd = [self.maprcli_bin, 'volume', 'snapshot', 'create', 

168 '-snapshotname', 

169 name, '-volume', volume_name] 

170 self._execute(*cmd) 

171 

172 def delete_snapshot(self, name, volume_name): 

173 cmd = [self.maprcli_bin, 'volume', 'snapshot', 'remove', 

174 '-snapshotname', 

175 name, '-volume', volume_name] 

176 out, __ = self._execute(*cmd, check_exit_code=False) 

177 # if snapshot does not exist do not raise ProcessExecutionError 

178 if self.ERROR_MSG in out and self.NOT_FOUND_MSG not in out: 

179 raise exception.ProcessExecutionError(out) 

180 

181 def get_volume_info(self, volume_name, columns=None): 

182 cmd = [self.maprcli_bin, 'volume', 'info', '-name', volume_name, 

183 '-json'] 

184 if columns: 184 ↛ 185line 184 didn't jump to line 185 because the condition on line 184 was never true

185 cmd += ['-columns', ','.join(columns)] 

186 out, __ = self._execute(*cmd) 

187 return json.loads(out)['data'][0] 

188 

189 def get_volume_info_by_path(self, volume_path, columns=None, 

190 check_if_exists=False): 

191 cmd = [self.maprcli_bin, 'volume', 'info', '-path', volume_path, 

192 '-json'] 

193 if columns: 193 ↛ 194line 193 didn't jump to line 194 because the condition on line 193 was never true

194 cmd += ['-columns', ','.join(columns)] 

195 out, __ = self._execute(*cmd, check_exit_code=not check_if_exists) 

196 if check_if_exists and self.NOT_FOUND_MSG in out: 

197 return None 

198 return json.loads(out)['data'][0] 

199 

200 def get_snapshot_list(self, volume_name=None, volume_path=None): 

201 params = {} 

202 if volume_name: 202 ↛ 204line 202 didn't jump to line 204 because the condition on line 202 was always true

203 params['volume'] = volume_name 

204 if volume_path: 204 ↛ 206line 204 didn't jump to line 206 because the condition on line 204 was always true

205 params['path'] = volume_name 

206 cmd = [self.maprcli_bin, 'volume', 'snapshot', 'list', '-volume', 

207 '-columns', 

208 'snapshotname', '-json'] 

209 cmd = self._add_params(cmd, **params) 

210 out, __ = self._execute(*cmd) 

211 return [x['snapshotname'] for x in json.loads(out)['data']] 

212 

213 def rename_volume(self, name, new_name): 

214 cmd = [self.maprcli_bin, 'volume', 'rename', '-name', name, '-newname', 

215 new_name] 

216 self._execute(*cmd) 

217 

218 def fs_capacity(self): 

219 cmd = [self.hadoop_bin, 'fs', '-df'] 

220 out, err = self._execute(*cmd) 

221 lines = out.splitlines() 

222 try: 

223 fields = lines[1].split() 

224 total = int(fields[1]) 

225 free = int(fields[3]) 

226 except (IndexError, ValueError): 

227 msg = _('Failed to get MapR-FS capacity info.') 

228 LOG.exception(msg) 

229 raise exception.ProcessExecutionError(msg) 

230 return total, free 

231 

232 def maprfs_ls(self, path): 

233 cmd = [self.hadoop_bin, 'fs', '-ls', path] 

234 out, __ = self._execute(*cmd) 

235 return out 

236 

237 def maprfs_cp(self, source, dest): 

238 cmd = [self.hadoop_bin, 'fs', '-cp', '-p', source, dest] 

239 self._execute(*cmd) 

240 

241 def maprfs_chmod(self, dest, mod): 

242 cmd = [self.hadoop_bin, 'fs', '-chmod', mod, dest] 

243 self._execute(*cmd) 

244 

245 def maprfs_du(self, path): 

246 cmd = [self.hadoop_bin, 'fs', '-du', '-s', path] 

247 out, __ = self._execute(*cmd) 

248 return int(out.split(' ')[0]) 

249 

250 def check_state(self): 

251 cmd = [self.hadoop_bin, 'fs', '-ls', '/'] 

252 out, __ = self._execute(*cmd, check_exit_code=False) 

253 return 'Found' in out 

254 

255 def dir_not_empty(self, path): 

256 cmd = [self.hadoop_bin, 'fs', '-ls', path] 

257 out, __ = self._execute(*cmd, check_exit_code=False) 

258 return 'Found' in out 

259 

260 def set_volume_ace(self, volume_name, access_rules): 

261 read_accesses = [] 

262 write_accesses = [] 

263 for access_rule in access_rules: 

264 if access_rule['access_level'] == constants.ACCESS_LEVEL_RO: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true

265 read_accesses.append(access_rule['access_to']) 

266 elif access_rule['access_level'] == constants.ACCESS_LEVEL_RW: 266 ↛ 263line 266 didn't jump to line 263 because the condition on line 266 was always true

267 read_accesses.append(access_rule['access_to']) 

268 write_accesses.append(access_rule['access_to']) 

269 

270 def rule_type(access_to): 

271 if self.group_exists(access_to): 

272 return 'g' 

273 elif self.user_exists(access_to): 

274 return 'u' 

275 else: 

276 # if nor user nor group exits, it should try add group rule 

277 return 'g' 

278 

279 read_accesses_string = '|'.join( 

280 map(lambda x: rule_type(x) + ':' + x, read_accesses)) 

281 write_accesses_string = '|'.join( 

282 map(lambda x: rule_type(x) + ':' + x, write_accesses)) 

283 cmd = [self.maprcli_bin, 'volume', 'modify', '-name', volume_name, 

284 '-readAce', read_accesses_string, '-writeAce', 

285 write_accesses_string] 

286 self._execute(*cmd) 

287 

288 def add_volume_ace_rules(self, volume_name, access_rules): 

289 if not access_rules: 

290 return 

291 access_rules_map = self.get_access_rules(volume_name) 

292 for access_rule in access_rules: 

293 access_rules_map[access_rule['access_to']] = access_rule 

294 self.set_volume_ace(volume_name, access_rules_map.values()) 

295 

296 def remove_volume_ace_rules(self, volume_name, access_rules): 

297 if not access_rules: 

298 return 

299 access_rules_map = self.get_access_rules(volume_name) 

300 for access_rule in access_rules: 

301 if access_rules_map.get(access_rule['access_to']): 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true

302 del access_rules_map[access_rule['access_to']] 

303 self.set_volume_ace(volume_name, access_rules_map.values()) 

304 

305 def get_access_rules(self, volume_name): 

306 info = self.get_volume_info(volume_name) 

307 aces = info['volumeAces'] 

308 read_ace = aces['readAce'] 

309 write_ace = aces['writeAce'] 

310 access_rules_map = {} 

311 self._retrieve_access_rules_from_ace(read_ace, 'r', access_rules_map) 

312 self._retrieve_access_rules_from_ace(write_ace, 'w', access_rules_map) 

313 return access_rules_map 

314 

315 def _retrieve_access_rules_from_ace(self, ace, ace_type, access_rules_map): 

316 access = constants.ACCESS_LEVEL_RW if ace_type == 'w' else ( 

317 constants.ACCESS_LEVEL_RO) 

318 if ace not in ['p', '']: 

319 write_rules = [x.strip() for x in ace.split('|')] 

320 for user in write_rules: 

321 rule_type, username = user.split(':') 

322 if rule_type not in ['u', 'g']: 

323 continue 

324 access_rules_map[username] = { 

325 'access_level': access, 

326 'access_to': username, 

327 'access_type': 'user', 

328 } 

329 

330 def user_exists(self, user): 

331 cmd = ['getent', 'passwd', user] 

332 out, __ = self._execute(*cmd, check_exit_code=False) 

333 return out != '' 

334 

335 def group_exists(self, group): 

336 cmd = ['getent', 'group', group] 

337 out, __ = self._execute(*cmd, check_exit_code=False) 

338 return out != '' 

339 

340 def get_cluster_name(self): 

341 cmd = [self.maprcli_bin, 'dashboard', 'info', '-json'] 

342 out, __ = self._execute(*cmd) 

343 try: 

344 return json.loads(out)['data'][0]['cluster']['name'] 

345 except (IndexError, ValueError) as e: 

346 msg = (_("Failed to parse cluster name. Error: %s") % e) 

347 raise exception.ProcessExecutionError(msg)