Coverage for manila/share/drivers/maprfs/driver_util.py: 93%
238 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2016, MapR Technologies
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15"""
16Utility for processing MapR cluster operations
17"""
19import json
20import shlex
21import socket
23from oslo_concurrency import processutils
24from oslo_log import log
26from manila.common import constants
27from manila import exception
28from manila.i18n import _
29from manila import ssh_utils
30from manila import utils
32LOG = log.getLogger(__name__)
35def get_version_handler(configuration):
36 # here can be choosing DriverUtils depend on cluster version
37 return BaseDriverUtil(configuration)
40class BaseDriverUtil(object):
41 """Utility class for MapR-FS specific operations."""
42 NOT_FOUND_MSG = 'No such'
43 ERROR_MSG = 'ERROR'
45 def __init__(self, configuration):
46 self.configuration = configuration
47 self.ssh_connections = {}
48 self.hosts = self.configuration.maprfs_clinode_ip
49 self.local_hosts = socket.gethostbyname_ex(socket.gethostname())[2]
50 self.maprcli_bin = '/usr/bin/maprcli'
51 self.hadoop_bin = '/usr/bin/hadoop'
53 def _execute(self, *cmd, **kwargs):
54 for x in range(0, len(self.hosts)): 54 ↛ exitline 54 didn't return from function '_execute' because the loop on line 54 didn't complete
55 try:
56 check_exit_code = kwargs.pop('check_exit_code', True)
57 host = self.hosts[x]
58 if host in self.local_hosts:
59 cmd = self._as_user(cmd,
60 self.configuration.maprfs_ssh_name)
61 out, err = utils.execute(*cmd,
62 check_exit_code=check_exit_code)
63 else:
64 out, err = self._run_ssh(host, cmd, check_exit_code)
65 # move available cldb host to the beginning
66 if x > 0:
67 self.hosts[0], self.hosts[x] = self.hosts[x], self.hosts[0]
68 return out, err
69 except exception.ProcessExecutionError as e:
70 if self._check_error(e):
71 raise
72 elif x < len(self.hosts) - 1: 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true
73 msg = ('Error running SSH command. Trying another host')
74 LOG.error(msg)
75 else:
76 raise
77 except Exception as e:
78 if x < len(self.hosts) - 1:
79 msg = ('Error running SSH command. Trying another host')
80 LOG.error(msg)
81 else:
82 raise exception.ProcessExecutionError(str(e))
84 def _run_ssh(self, host, cmd_list, check_exit_code=False):
85 command = ' '.join(shlex.quote(cmd_arg) for cmd_arg in cmd_list)
86 connection = self.ssh_connections.get(host)
87 if connection is None: 87 ↛ 107line 87 didn't jump to line 107 because the condition on line 87 was always true
88 ssh_name = self.configuration.maprfs_ssh_name
89 password = self.configuration.maprfs_ssh_pw
90 private_key = self.configuration.maprfs_ssh_private_key
91 remote_ssh_port = self.configuration.maprfs_ssh_port
92 ssh_conn_timeout = self.configuration.ssh_conn_timeout
93 min_size = self.configuration.ssh_min_pool_conn
94 max_size = self.configuration.ssh_max_pool_conn
96 ssh_pool = ssh_utils.SSHPool(host,
97 remote_ssh_port,
98 ssh_conn_timeout,
99 ssh_name,
100 password=password,
101 privatekey=private_key,
102 min_size=min_size,
103 max_size=max_size)
104 ssh = ssh_pool.create()
105 self.ssh_connections[host] = (ssh_pool, ssh)
106 else:
107 ssh_pool, ssh = connection
109 if not ssh.get_transport().is_active():
110 ssh_pool.remove(ssh)
111 ssh = ssh_pool.create()
112 self.ssh_connections[host] = (ssh_pool, ssh)
113 return processutils.ssh_execute(
114 ssh,
115 command,
116 check_exit_code=check_exit_code)
118 @staticmethod
119 def _check_error(error):
120 # check if error was native
121 return BaseDriverUtil.ERROR_MSG in error.stdout
123 @staticmethod
124 def _as_user(cmd, user):
125 return ['sudo', 'su', '-', user, '-c',
126 ' '.join(shlex.quote(cmd_arg) for cmd_arg in cmd)]
128 @staticmethod
129 def _add_params(cmd, **kwargs):
130 params = []
131 for x in kwargs.keys():
132 params.append('-' + x)
133 params.append(kwargs[x])
134 return cmd + params
136 def create_volume(self, name, path, size, **kwargs):
137 # delete size param as it is set separately
138 if kwargs.get('quota'): 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true
139 del kwargs['quota']
140 sizestr = str(size) + 'G'
141 cmd = [self.maprcli_bin, 'volume', 'create', '-name',
142 name, '-path', path, '-quota',
143 sizestr, '-readAce', '', '-writeAce', '']
144 cmd = self._add_params(cmd, **kwargs)
145 self._execute(*cmd)
147 def volume_exists(self, volume_name):
148 cmd = [self.maprcli_bin, 'volume', 'info', '-name', volume_name]
149 out, __ = self._execute(*cmd, check_exit_code=False)
150 return self.NOT_FOUND_MSG not in out
152 def delete_volume(self, name):
153 cmd = [self.maprcli_bin, 'volume', 'remove', '-name', name, '-force',
154 'true']
155 out, __ = self._execute(*cmd, check_exit_code=False)
156 # if volume does not exist do not raise exception.ProcessExecutionError
157 if self.ERROR_MSG in out and self.NOT_FOUND_MSG not in out: 157 ↛ 158line 157 didn't jump to line 158 because the condition on line 157 was never true
158 raise exception.ProcessExecutionError(out)
160 def set_volume_size(self, name, size):
161 sizestr = str(size) + 'G'
162 cmd = [self.maprcli_bin, 'volume', 'modify', '-name', name, '-quota',
163 sizestr]
164 self._execute(*cmd)
166 def create_snapshot(self, name, volume_name):
167 cmd = [self.maprcli_bin, 'volume', 'snapshot', 'create',
168 '-snapshotname',
169 name, '-volume', volume_name]
170 self._execute(*cmd)
172 def delete_snapshot(self, name, volume_name):
173 cmd = [self.maprcli_bin, 'volume', 'snapshot', 'remove',
174 '-snapshotname',
175 name, '-volume', volume_name]
176 out, __ = self._execute(*cmd, check_exit_code=False)
177 # if snapshot does not exist do not raise ProcessExecutionError
178 if self.ERROR_MSG in out and self.NOT_FOUND_MSG not in out:
179 raise exception.ProcessExecutionError(out)
181 def get_volume_info(self, volume_name, columns=None):
182 cmd = [self.maprcli_bin, 'volume', 'info', '-name', volume_name,
183 '-json']
184 if columns: 184 ↛ 185line 184 didn't jump to line 185 because the condition on line 184 was never true
185 cmd += ['-columns', ','.join(columns)]
186 out, __ = self._execute(*cmd)
187 return json.loads(out)['data'][0]
189 def get_volume_info_by_path(self, volume_path, columns=None,
190 check_if_exists=False):
191 cmd = [self.maprcli_bin, 'volume', 'info', '-path', volume_path,
192 '-json']
193 if columns: 193 ↛ 194line 193 didn't jump to line 194 because the condition on line 193 was never true
194 cmd += ['-columns', ','.join(columns)]
195 out, __ = self._execute(*cmd, check_exit_code=not check_if_exists)
196 if check_if_exists and self.NOT_FOUND_MSG in out:
197 return None
198 return json.loads(out)['data'][0]
200 def get_snapshot_list(self, volume_name=None, volume_path=None):
201 params = {}
202 if volume_name: 202 ↛ 204line 202 didn't jump to line 204 because the condition on line 202 was always true
203 params['volume'] = volume_name
204 if volume_path: 204 ↛ 206line 204 didn't jump to line 206 because the condition on line 204 was always true
205 params['path'] = volume_name
206 cmd = [self.maprcli_bin, 'volume', 'snapshot', 'list', '-volume',
207 '-columns',
208 'snapshotname', '-json']
209 cmd = self._add_params(cmd, **params)
210 out, __ = self._execute(*cmd)
211 return [x['snapshotname'] for x in json.loads(out)['data']]
213 def rename_volume(self, name, new_name):
214 cmd = [self.maprcli_bin, 'volume', 'rename', '-name', name, '-newname',
215 new_name]
216 self._execute(*cmd)
218 def fs_capacity(self):
219 cmd = [self.hadoop_bin, 'fs', '-df']
220 out, err = self._execute(*cmd)
221 lines = out.splitlines()
222 try:
223 fields = lines[1].split()
224 total = int(fields[1])
225 free = int(fields[3])
226 except (IndexError, ValueError):
227 msg = _('Failed to get MapR-FS capacity info.')
228 LOG.exception(msg)
229 raise exception.ProcessExecutionError(msg)
230 return total, free
232 def maprfs_ls(self, path):
233 cmd = [self.hadoop_bin, 'fs', '-ls', path]
234 out, __ = self._execute(*cmd)
235 return out
237 def maprfs_cp(self, source, dest):
238 cmd = [self.hadoop_bin, 'fs', '-cp', '-p', source, dest]
239 self._execute(*cmd)
241 def maprfs_chmod(self, dest, mod):
242 cmd = [self.hadoop_bin, 'fs', '-chmod', mod, dest]
243 self._execute(*cmd)
245 def maprfs_du(self, path):
246 cmd = [self.hadoop_bin, 'fs', '-du', '-s', path]
247 out, __ = self._execute(*cmd)
248 return int(out.split(' ')[0])
250 def check_state(self):
251 cmd = [self.hadoop_bin, 'fs', '-ls', '/']
252 out, __ = self._execute(*cmd, check_exit_code=False)
253 return 'Found' in out
255 def dir_not_empty(self, path):
256 cmd = [self.hadoop_bin, 'fs', '-ls', path]
257 out, __ = self._execute(*cmd, check_exit_code=False)
258 return 'Found' in out
260 def set_volume_ace(self, volume_name, access_rules):
261 read_accesses = []
262 write_accesses = []
263 for access_rule in access_rules:
264 if access_rule['access_level'] == constants.ACCESS_LEVEL_RO: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true
265 read_accesses.append(access_rule['access_to'])
266 elif access_rule['access_level'] == constants.ACCESS_LEVEL_RW: 266 ↛ 263line 266 didn't jump to line 263 because the condition on line 266 was always true
267 read_accesses.append(access_rule['access_to'])
268 write_accesses.append(access_rule['access_to'])
270 def rule_type(access_to):
271 if self.group_exists(access_to):
272 return 'g'
273 elif self.user_exists(access_to):
274 return 'u'
275 else:
276 # if nor user nor group exits, it should try add group rule
277 return 'g'
279 read_accesses_string = '|'.join(
280 map(lambda x: rule_type(x) + ':' + x, read_accesses))
281 write_accesses_string = '|'.join(
282 map(lambda x: rule_type(x) + ':' + x, write_accesses))
283 cmd = [self.maprcli_bin, 'volume', 'modify', '-name', volume_name,
284 '-readAce', read_accesses_string, '-writeAce',
285 write_accesses_string]
286 self._execute(*cmd)
288 def add_volume_ace_rules(self, volume_name, access_rules):
289 if not access_rules:
290 return
291 access_rules_map = self.get_access_rules(volume_name)
292 for access_rule in access_rules:
293 access_rules_map[access_rule['access_to']] = access_rule
294 self.set_volume_ace(volume_name, access_rules_map.values())
296 def remove_volume_ace_rules(self, volume_name, access_rules):
297 if not access_rules:
298 return
299 access_rules_map = self.get_access_rules(volume_name)
300 for access_rule in access_rules:
301 if access_rules_map.get(access_rule['access_to']): 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true
302 del access_rules_map[access_rule['access_to']]
303 self.set_volume_ace(volume_name, access_rules_map.values())
305 def get_access_rules(self, volume_name):
306 info = self.get_volume_info(volume_name)
307 aces = info['volumeAces']
308 read_ace = aces['readAce']
309 write_ace = aces['writeAce']
310 access_rules_map = {}
311 self._retrieve_access_rules_from_ace(read_ace, 'r', access_rules_map)
312 self._retrieve_access_rules_from_ace(write_ace, 'w', access_rules_map)
313 return access_rules_map
315 def _retrieve_access_rules_from_ace(self, ace, ace_type, access_rules_map):
316 access = constants.ACCESS_LEVEL_RW if ace_type == 'w' else (
317 constants.ACCESS_LEVEL_RO)
318 if ace not in ['p', '']:
319 write_rules = [x.strip() for x in ace.split('|')]
320 for user in write_rules:
321 rule_type, username = user.split(':')
322 if rule_type not in ['u', 'g']:
323 continue
324 access_rules_map[username] = {
325 'access_level': access,
326 'access_to': username,
327 'access_type': 'user',
328 }
330 def user_exists(self, user):
331 cmd = ['getent', 'passwd', user]
332 out, __ = self._execute(*cmd, check_exit_code=False)
333 return out != ''
335 def group_exists(self, group):
336 cmd = ['getent', 'group', group]
337 out, __ = self._execute(*cmd, check_exit_code=False)
338 return out != ''
340 def get_cluster_name(self):
341 cmd = [self.maprcli_bin, 'dashboard', 'info', '-json']
342 out, __ = self._execute(*cmd)
343 try:
344 return json.loads(out)['data'][0]['cluster']['name']
345 except (IndexError, ValueError) as e:
346 msg = (_("Failed to parse cluster name. Error: %s") % e)
347 raise exception.ProcessExecutionError(msg)