Coverage for manila/share/drivers/ganesha/manager.py: 92%
355 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2014 Red Hat, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import io
17import os
18import re
19import shlex
20import sys
22from oslo_log import log
23from oslo_serialization import jsonutils
24from oslo_utils import importutils
26from manila import exception
27from manila.i18n import _
28from manila.share.drivers.ganesha import utils as ganesha_utils
29from manila import utils
31LOG = log.getLogger(__name__)
32IWIDTH = 4
35def _conf2json(conf):
36 """Convert Ganesha config to JSON."""
38 # tokenize config string
39 token_list = [io.StringIO()]
40 state = {
41 'in_quote': False,
42 'in_comment': False,
43 'escape': False,
44 }
46 cbk = []
47 for char in conf:
48 if state['in_quote']: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true
49 if not state['escape']:
50 if char == '"':
51 state['in_quote'] = False
52 cbk.append(lambda: token_list.append(io.StringIO()))
53 elif char == '\\':
54 cbk.append(lambda: state.update({'escape': True}))
55 else:
56 if char == "#":
57 state['in_comment'] = True
58 if state['in_comment']:
59 if char == "\n":
60 state['in_comment'] = False
61 else:
62 if char == '"': 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true
63 token_list.append(io.StringIO())
64 state['in_quote'] = True
65 state['escape'] = False
66 if not state['in_comment']:
67 token_list[-1].write(char)
68 while cbk: 68 ↛ 69line 68 didn't jump to line 69 because the condition on line 68 was never true
69 cbk.pop(0)()
71 if state['in_quote']: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true
72 raise RuntimeError("Unterminated quoted string")
74 # jsonify tokens
75 js_token_list = ["{"]
76 for tok in token_list:
77 tok = tok.getvalue()
79 if tok[0] == '"': 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true
80 js_token_list.append(tok)
81 continue
83 for pat, s in [
84 # add omitted "=" signs to block openings
85 (r'([^=\s])\s*{', '\\1={'),
86 # delete trailing semicolons in blocks
87 (r';\s*}', '}'),
88 # add omitted semicolons after blocks
89 (r'}\s*([^}\s])', '};\\1'),
90 # separate syntactically significant characters
91 (r'([;{}=])', ' \\1 ')]:
92 tok = re.sub(pat, s, tok)
94 # map tokens to JSON equivalents
95 for word in tok.split():
96 if word == "=":
97 word = ":"
98 elif word == ";":
99 word = ','
100 elif (word in ['{', '}'] or
101 re.search(r'\A-?[1-9]\d*(\.\d+)?\Z', word)):
102 pass
103 else:
104 word = jsonutils.dumps(word)
105 js_token_list.append(word)
106 js_token_list.append("}")
108 # group quoted strings
109 token_grp_list = []
110 for tok in js_token_list:
111 if tok[0] == '"':
112 if not (token_grp_list and isinstance(token_grp_list[-1], list)): 112 ↛ 114line 112 didn't jump to line 114 because the condition on line 112 was always true
113 token_grp_list.append([])
114 token_grp_list[-1].append(tok)
115 else:
116 token_grp_list.append(tok)
118 # process quoted string groups by joining them
119 js_token_list2 = []
120 for x in token_grp_list:
121 if isinstance(x, list):
122 x = ''.join(['"'] + [tok[1:-1] for tok in x] + ['"'])
123 js_token_list2.append(x)
125 return ''.join(js_token_list2)
128def _dump_to_conf(confdict, out=sys.stdout, indent=0):
129 """Output confdict in Ganesha config format."""
130 if isinstance(confdict, dict):
131 for k, v in confdict.items():
132 if v is None: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true
133 continue
134 if isinstance(v, dict):
135 out.write(' ' * (indent * IWIDTH) + k + ' ')
136 out.write("{\n")
137 _dump_to_conf(v, out, indent + 1)
138 out.write(' ' * (indent * IWIDTH) + '}')
139 elif isinstance(v, list):
140 for item in v:
141 out.write(' ' * (indent * IWIDTH) + k + ' ')
142 out.write("{\n")
143 _dump_to_conf(item, out, indent + 1)
144 out.write(' ' * (indent * IWIDTH) + '}\n')
145 # The 'CLIENTS' Ganesha string option is an exception in that it's
146 # string value can't be enclosed within quotes as can be done for
147 # other string options in a valid Ganesha conf file.
148 elif k.upper() == 'CLIENTS':
149 out.write(' ' * (indent * IWIDTH) + k + ' = ' + v + ';')
150 else:
151 out.write(' ' * (indent * IWIDTH) + k + ' ')
152 out.write('= ')
153 _dump_to_conf(v, out, indent)
154 out.write(';')
155 out.write('\n')
156 else:
157 dj = jsonutils.dumps(confdict)
158 out.write(dj)
161def parseconf(conf):
162 """Parse Ganesha config.
164 Both native format and JSON are supported.
166 Convert config to a (nested) dictionary.
167 """
168 def list_to_dict(src_list):
169 # Convert a list of key-value pairs stored as tuples to a dict.
170 # For tuples with identical keys, preserve all the values in a
171 # list. e.g., argument [('k', 'v1'), ('k', 'v2')] to function
172 # returns {'k': ['v1', 'v2']}.
173 dst_dict = {}
174 for i in src_list:
175 if isinstance(i, tuple): 175 ↛ 174line 175 didn't jump to line 174 because the condition on line 175 was always true
176 k, v = i
177 if isinstance(v, list):
178 v = list_to_dict(v)
179 if k in dst_dict:
180 dst_dict[k] = [dst_dict[k]]
181 dst_dict[k].append(v)
182 else:
183 dst_dict[k] = v
184 return dst_dict
186 try:
187 # allow config to be specified in JSON --
188 # for sake of people who might feel Ganesha config foreign.
189 d = jsonutils.loads(conf)
190 except ValueError:
191 # Customize JSON decoder to convert Ganesha config to a list
192 # of key-value pairs stored as tuples. This allows multiple
193 # occurrences of a config block to be later converted to a
194 # dict key-value pair, with block name being the key and a
195 # list of block contents being the value.
196 li = jsonutils.loads(_conf2json(conf), object_pairs_hook=lambda x: x)
197 d = list_to_dict(li)
198 return d
201def mkconf(confdict):
202 """Create Ganesha config string from confdict."""
203 s = io.StringIO()
204 _dump_to_conf(confdict, s)
205 return s.getvalue()
208rados = None
211def setup_rados():
212 global rados
213 if not rados: 213 ↛ exitline 213 didn't return from function 'setup_rados' because the condition on line 213 was always true
214 try:
215 rados = importutils.import_module('rados')
216 except ImportError:
217 raise exception.ShareBackendException(
218 _("rados python module is not installed"))
221class GaneshaManager(object):
222 """Ganesha instrumentation class."""
224 def __init__(self, execute, tag, **kwargs):
225 self.confrx = re.compile(r'\.conf\Z')
226 self.ganesha_config_path = kwargs['ganesha_config_path']
227 self.tag = tag
229 def _execute(*args, **kwargs):
230 msg = kwargs.pop('message', args[0])
231 makelog = kwargs.pop('makelog', True)
232 try:
233 return execute(*args, **kwargs)
234 except exception.ProcessExecutionError as e:
235 if makelog:
236 LOG.error(
237 ("Error while executing management command on "
238 "Ganesha node %(tag)s: %(msg)s."),
239 {'tag': tag, 'msg': msg})
240 raise exception.GaneshaCommandFailure(
241 stdout=e.stdout, stderr=e.stderr, exit_code=e.exit_code,
242 cmd=e.cmd)
243 self.execute = _execute
244 self.ganesha_service = kwargs['ganesha_service_name']
245 self.ganesha_export_dir = kwargs['ganesha_export_dir']
246 self.execute('mkdir', '-p', self.ganesha_export_dir)
248 self.ganesha_rados_store_enable = kwargs.get(
249 'ganesha_rados_store_enable')
250 if self.ganesha_rados_store_enable:
251 setup_rados()
252 self.ganesha_rados_store_pool_name = (
253 kwargs['ganesha_rados_store_pool_name'])
254 self.ganesha_rados_export_counter = (
255 kwargs['ganesha_rados_export_counter'])
256 self.ganesha_rados_export_index = (
257 kwargs['ganesha_rados_export_index'])
258 self.rados_client = kwargs['rados_client']
259 try:
260 self._get_rados_object(self.ganesha_rados_export_counter)
261 except rados.ObjectNotFound:
262 self._put_rados_object(self.ganesha_rados_export_counter,
263 str(1000))
264 else:
265 self.ganesha_db_path = kwargs['ganesha_db_path']
266 self.execute('mkdir', '-p', os.path.dirname(self.ganesha_db_path))
267 # Here we are to make sure that an SQLite database of the
268 # required scheme exists at self.ganesha_db_path.
269 # The following command gets us there -- provided the file
270 # does not yet exist (otherwise it just fails). However,
271 # we don't care about this condition, we just execute the
272 # command unconditionally (ignoring failure). Instead we
273 # directly query the db right after, to check its validity.
274 self.execute(
275 "sqlite3", self.ganesha_db_path,
276 'create table ganesha(key varchar(20) primary key, '
277 'value int); insert into ganesha values("exportid", '
278 '100);', run_as_root=False, check_exit_code=False)
279 self.get_export_id(bump=False)
281 def _getpath(self, name):
282 """Get the path of config file for name."""
283 return os.path.join(self.ganesha_export_dir, name + ".conf")
285 @staticmethod
286 def _get_export_rados_object_name(name):
287 return 'ganesha-export-' + name
289 def _write_tmp_conf_file(self, path, data):
290 """Write data to tmp conf file."""
291 dirpath, fname = (getattr(os.path, q + "name")(path) for q in
292 ("dir", "base"))
293 tmpf = self.execute('mktemp', '-p', dirpath, "-t",
294 fname + ".XXXXXX")[0][:-1]
295 self.execute(
296 'sh', '-c',
297 'echo %s > %s' % (shlex.quote(data), shlex.quote(tmpf)),
298 message='writing ' + tmpf)
299 return tmpf
301 def _write_conf_file(self, name, data):
302 """Write data to config file for name atomically."""
303 path = self._getpath(name)
304 tmpf = self._write_tmp_conf_file(path, data)
305 try:
306 self.execute('mv', tmpf, path)
307 except exception.ProcessExecutionError as e:
308 LOG.error('mv temp file (%s) to %s failed.', tmpf, path)
309 self.execute('rm', tmpf)
310 raise exception.GaneshaCommandFailure(
311 stdout=e.stdout, stderr=e.stderr, exit_code=e.exit_code,
312 cmd=e.cmd)
314 return path
316 def _mkindex(self):
317 """Generate the index file for current exports."""
318 @utils.synchronized("ganesha-index-" + self.tag, external=True)
319 def _mkindex():
320 files = filter(lambda f: self.confrx.search(f) and
321 f != "INDEX.conf",
322 self.execute('ls', self.ganesha_export_dir,
323 run_as_root=False)[0].split("\n"))
324 index = "".join(map(lambda f: "%include " + os.path.join(
325 self.ganesha_export_dir, f) + "\n", files))
326 self._write_conf_file("INDEX", index)
327 _mkindex()
329 def _read_export_rados_object(self, name):
330 return parseconf(self._get_rados_object(
331 self._get_export_rados_object_name(name)))
333 def _read_export_file(self, name):
334 return parseconf(self.execute("cat", self._getpath(name),
335 message='reading export ' + name)[0])
337 def _read_export(self, name):
338 """Return the dict of the export identified by name."""
339 if self.ganesha_rados_store_enable:
340 return self._read_export_rados_object(name)
341 else:
342 return self._read_export_file(name)
344 def _check_export_rados_object_exists(self, name):
345 try:
346 self._get_rados_object(
347 self._get_export_rados_object_name(name))
348 return True
349 except rados.ObjectNotFound:
350 return False
352 def _check_file_exists(self, path):
353 try:
354 self.execute('test', '-f', path, makelog=False,
355 run_as_root=False)
356 return True
357 except exception.GaneshaCommandFailure as e:
358 if e.exit_code == 1:
359 return False
360 else:
361 raise exception.GaneshaCommandFailure(
362 stdout=e.stdout, stderr=e.stderr, exit_code=e.exit_code,
363 cmd=e.cmd)
365 def _check_export_file_exists(self, name):
366 return self._check_file_exists(self._getpath(name))
368 def check_export_exists(self, name):
369 """Check whether export exists."""
370 if self.ganesha_rados_store_enable:
371 return self._check_export_rados_object_exists(name)
372 else:
373 return self._check_export_file_exists(name)
375 def _write_export_rados_object(self, name, data):
376 """Write confdict to the export RADOS object of name."""
377 self._put_rados_object(self._get_export_rados_object_name(name),
378 data)
379 # temp export config file required for DBus calls
380 return self._write_tmp_conf_file(self._getpath(name), data)
382 def _write_export(self, name, confdict):
383 """Write confdict to the export file or RADOS object of name."""
384 for k, v in ganesha_utils.walk(confdict):
385 # values in the export block template that need to be
386 # filled in by Manila are pre-fixed by '@'
387 if isinstance(v, str) and v[0] == '@':
388 msg = _("Incomplete export block: value %(val)s of attribute "
389 "%(key)s is a stub.") % {'key': k, 'val': v}
390 raise exception.InvalidParameterValue(err=msg)
391 if self.ganesha_rados_store_enable:
392 return self._write_export_rados_object(name, mkconf(confdict))
393 else:
394 return self._write_conf_file(name, mkconf(confdict))
396 def _rm_file(self, path):
397 self.execute("rm", "-f", path)
399 def _rm_export_file(self, name):
400 """Remove export file of name."""
401 self._rm_file(self._getpath(name))
403 def _rm_export_rados_object(self, name):
404 """Remove export object of name."""
405 self._delete_rados_object(self._get_export_rados_object_name(name))
407 def _dbus_send_ganesha(self, method, *args, **kwargs):
408 """Send a message to Ganesha via dbus."""
409 service = kwargs.pop("service", "exportmgr")
410 self.execute("dbus-send", "--print-reply", "--system",
411 "--dest=org.ganesha.nfsd", "/org/ganesha/nfsd/ExportMgr",
412 "org.ganesha.nfsd.%s.%s" % (service, method), *args,
413 message='dbus call %s.%s' % (service, method), **kwargs)
415 def _remove_export_dbus(self, xid):
416 """Remove an export from Ganesha runtime with given export id."""
417 self._dbus_send_ganesha("RemoveExport", "uint16:%d" % xid)
419 def _add_rados_object_url_to_index(self, name):
420 """Add an export RADOS object's URL to the RADOS URL index."""
422 # TODO(rraja): Ensure that the export index object's update is atomic,
423 # e.g., retry object update until the object version between the 'get'
424 # and 'put' operations remains the same.
425 index_data = self._get_rados_object(self.ganesha_rados_export_index)
427 want_url = "%url rados://{0}/{1}".format(
428 self.ganesha_rados_store_pool_name,
429 self._get_export_rados_object_name(name))
431 if index_data:
432 self._put_rados_object(
433 self.ganesha_rados_export_index,
434 '\n'.join([index_data, want_url])
435 )
436 else:
437 self._put_rados_object(self.ganesha_rados_export_index, want_url)
439 def _remove_rados_object_url_from_index(self, name):
440 """Remove an export RADOS object's URL from the RADOS URL index."""
442 # TODO(rraja): Ensure that the export index object's update is atomic,
443 # e.g., retry object update until the object version between the 'get'
444 # and 'put' operations remains the same.
445 index_data = self._get_rados_object(self.ganesha_rados_export_index)
446 if not index_data:
447 return
449 unwanted_url = "%url rados://{0}/{1}".format(
450 self.ganesha_rados_store_pool_name,
451 self._get_export_rados_object_name(name))
453 rados_urls = index_data.split('\n')
454 new_rados_urls = [url for url in rados_urls if url != unwanted_url]
456 self._put_rados_object(self.ganesha_rados_export_index,
457 '\n'.join(new_rados_urls))
459 def add_export(self, name, confdict):
460 """Add an export to Ganesha specified by confdict."""
461 xid = confdict["EXPORT"]["Export_Id"]
462 undos = []
463 _mkindex_called = False
464 try:
465 path = self._write_export(name, confdict)
466 if self.ganesha_rados_store_enable:
467 undos.append(lambda: self._rm_export_rados_object(name))
468 undos.append(lambda: self._rm_file(path))
469 else:
470 undos.append(lambda: self._rm_export_file(name))
472 self._dbus_send_ganesha("AddExport", "string:" + path,
473 "string:EXPORT(Export_Id=%d)" % xid)
474 undos.append(lambda: self._remove_export_dbus(xid))
476 if self.ganesha_rados_store_enable:
477 # Clean up temp export file used for the DBus call
478 self._rm_file(path)
479 self._add_rados_object_url_to_index(name)
480 else:
481 _mkindex_called = True
482 self._mkindex()
483 except exception.ProcessExecutionError as e:
484 for u in undos:
485 u()
486 if not self.ganesha_rados_store_enable and not _mkindex_called:
487 self._mkindex()
488 raise exception.GaneshaCommandFailure(
489 stdout=e.stdout, stderr=e.stderr, exit_code=e.exit_code,
490 cmd=e.cmd)
492 def update_export(self, name, confdict):
493 """Update an export to Ganesha specified by confdict."""
494 xid = confdict["EXPORT"]["Export_Id"]
495 old_confdict = self._read_export(name)
497 path = self._write_export(name, confdict)
498 try:
499 self._dbus_send_ganesha("UpdateExport", "string:" + path,
500 "string:EXPORT(Export_Id=%d)" % xid)
501 except exception.ProcessExecutionError as e:
502 # Revert the export update.
503 self._write_export(name, old_confdict)
504 raise exception.GaneshaCommandFailure(
505 stdout=e.stdout, stderr=e.stderr, exit_code=e.exit_code,
506 cmd=e.cmd)
507 finally:
508 if self.ganesha_rados_store_enable:
509 # Clean up temp export file used for the DBus update call
510 self._rm_file(path)
512 def remove_export(self, name):
513 """Remove an export from Ganesha."""
514 try:
515 confdict = self._read_export(name)
516 self._remove_export_dbus(confdict["EXPORT"]["Export_Id"])
517 except Exception:
518 LOG.exception("There was a problem removing the export. "
519 "Ignoring errors and continuing operation.")
520 finally:
521 if self.ganesha_rados_store_enable:
522 self._delete_rados_object(
523 self._get_export_rados_object_name(name))
524 self._remove_rados_object_url_from_index(name)
525 else:
526 self._rm_export_file(name)
527 self._mkindex()
529 def _get_rados_object(self, object_name):
530 """Synchronously read data from Ceph RADOS object as a text string.
532 :param pool_name: name of the pool
533 :type pool_name: str
534 :param object_name: name of the object
535 :type object_name: str
536 :returns: tuple of object data and version
537 """
539 pool_name = self.ganesha_rados_store_pool_name
541 ioctx = self.rados_client.open_ioctx(pool_name)
543 osd_max_write_size = self.rados_client.conf_get('osd_max_write_size')
544 max_size = int(osd_max_write_size) * 1024 * 1024
545 try:
546 bytes_read = ioctx.read(object_name, max_size)
547 if ((len(bytes_read) == max_size) and 547 ↛ 549line 547 didn't jump to line 549 because the condition on line 547 was never true
548 (ioctx.read(object_name, 1, offset=max_size))):
549 LOG.warning("Size of object %s exceeds '%d' bytes "
550 "read", object_name, max_size)
551 finally:
552 ioctx.close()
554 bytes_read_decoded = bytes_read.decode('utf-8')
556 return bytes_read_decoded
558 def _put_rados_object(self, object_name, data):
559 """Synchronously write data as a byte string in a Ceph RADOS object.
561 :param pool_name: name of the pool
562 :type pool_name: str
563 :param object_name: name of the object
564 :type object_name: str
565 :param data: data to write
566 :type data: bytes
567 """
569 pool_name = self.ganesha_rados_store_pool_name
570 encoded_data = data.encode('utf-8')
572 ioctx = self.rados_client.open_ioctx(pool_name)
574 max_size = int(
575 self.rados_client.conf_get('osd_max_write_size')) * 1024 * 1024
576 if len(encoded_data) > max_size: 576 ↛ 577line 576 didn't jump to line 577 because the condition on line 576 was never true
577 msg = ("Data to be written to object '{0}' exceeds "
578 "{1} bytes".format(object_name, max_size))
579 LOG.error(msg)
580 raise exception.ShareBackendException(msg)
582 try:
583 with rados.WriteOpCtx() as wop:
584 wop.write_full(encoded_data)
585 ioctx.operate_write_op(wop, object_name)
586 except rados.OSError as e:
587 LOG.error(e)
588 raise e
589 finally:
590 ioctx.close()
592 def _delete_rados_object(self, object_name):
593 pool_name = self.ganesha_rados_store_pool_name
594 ioctx = self.rados_client.open_ioctx(pool_name)
595 try:
596 ioctx.remove_object(object_name)
597 except rados.ObjectNotFound:
598 LOG.warning("Object '%s' was already removed", object_name)
599 finally:
600 ioctx.close()
602 def get_export_id(self, bump=True):
603 """Get a new export id."""
604 # XXX overflowing the export id (16 bit unsigned integer)
605 # is not handled
606 if self.ganesha_rados_store_enable:
607 # TODO(rraja): Ensure that the export counter object's update is
608 # atomic, e.g., retry object update until the object version
609 # between the 'get' and 'put' operations remains the same.
610 export_id = int(
611 self._get_rados_object(self.ganesha_rados_export_counter))
612 if not bump:
613 return export_id
614 export_id += 1
615 self._put_rados_object(self.ganesha_rados_export_counter,
616 str(export_id))
617 return export_id
618 else:
619 if bump:
620 bumpcode = 'update ganesha set value = value + 1;'
621 else:
622 bumpcode = ''
623 out = self.execute(
624 "sqlite3", self.ganesha_db_path,
625 bumpcode + 'select * from ganesha ' # nosec B608
626 'where key = "exportid";',
627 run_as_root=False)[0]
628 match = re.search(r'\Aexportid\|(\d+)$', out)
629 if not match:
630 LOG.error("Invalid export database on "
631 "Ganesha node %(tag)s: %(db)s.",
632 {'tag': self.tag, 'db': self.ganesha_db_path})
633 raise exception.InvalidSqliteDB()
634 return int(match.groups()[0])
636 def restart_service(self):
637 """Restart the Ganesha service."""
638 self.execute("service", self.ganesha_service, "restart")
640 def reset_exports(self):
641 """Delete all export files."""
642 self.execute('sh', '-c',
643 'rm -f %s/*.conf' % shlex.quote(self.ganesha_export_dir))
644 self._mkindex()