Coverage for manila/share/drivers/ganesha/utils.py: 94%
68 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2014 Red Hat, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import os
17import shlex
19from oslo_concurrency import processutils
20from oslo_log import log
22from manila import exception
23from manila.i18n import _
24from manila import ssh_utils
25from manila import utils
27LOG = log.getLogger(__name__)
30def patch(base, *overlays):
31 """Recursive dictionary patching."""
32 for ovl in overlays:
33 for k, v in ovl.items():
34 if isinstance(v, dict) and isinstance(base.get(k), dict):
35 patch(base[k], v)
36 else:
37 base[k] = v
38 return base
41def walk(dct):
42 """Recursive iteration over dictionary."""
43 for k, v in dct.items():
44 if isinstance(v, dict):
45 for w in walk(v):
46 yield w
47 else:
48 yield k, v
51class RootExecutor(object):
52 """Execute wrapper defaulting to root execution."""
54 def __init__(self, execute=utils.execute):
55 self.execute = execute
57 def __call__(self, *args, **kwargs):
58 exkwargs = {"run_as_root": True}
59 exkwargs.update(kwargs)
60 return self.execute(*args, **exkwargs)
63class SSHExecutor(object):
64 """Callable encapsulating exec through ssh."""
66 def __init__(self, *args, **kwargs):
67 self.pool = ssh_utils.SSHPool(*args, **kwargs)
69 def __call__(self, *args, **kwargs):
70 # argument with identifier 'run_as_root=' is not accepted by
71 # processutils's ssh_execute() method unlike processutils's execute()
72 # method. So implement workaround to enable or disable 'run as root'
73 # behavior.
74 run_as_root = kwargs.pop('run_as_root', False)
75 cmd = ' '.join(shlex.quote(a) for a in args)
76 if run_as_root:
77 cmd = ' '.join(['sudo', cmd])
78 ssh = self.pool.get()
79 try:
80 ret = processutils.ssh_execute(ssh, cmd, **kwargs)
81 finally:
82 self.pool.put(ssh)
83 return ret
86def path_from(fpath, *rpath):
87 """Return the join of the dir of fpath and rpath in absolute form."""
88 return os.path.join(os.path.abspath(os.path.dirname(fpath)), *rpath)
91def validate_access_rule(supported_access_types, supported_access_levels,
92 access_rule, abort=False):
94 """Validate an access rule.
96 :param access_rule: Access rules to be validated.
97 :param supported_access_types: List of access types that are regarded
98 valid.
99 :param supported_access_levels: List of access levels that are
100 regarded valid.
101 :param abort: a boolean value that indicates if an exception should
102 be raised whether the rule is invalid.
103 :return: Boolean.
104 """
106 errmsg = _("Unsupported access rule of 'type' %(access_type)s, "
107 "'level' %(access_level)s, 'to' %(access_to)s: "
108 "%(field)s should be one of %(supported)s.")
110 if not isinstance(access_rule, dict): 110 ↛ 113line 110 didn't jump to line 113 because the condition on line 110 was always true
111 access_param = access_rule.to_dict()
112 else:
113 access_param = access_rule
115 def validate(field, supported_tokens, excinfo):
116 if access_rule['access_%s' % field] in supported_tokens:
117 return True
119 access_param['field'] = field
120 access_param['supported'] = ', '.join(
121 "'%s'" % x for x in supported_tokens)
122 if abort:
123 LOG.error(errmsg, access_param)
124 raise excinfo['type'](
125 **{excinfo['about']: excinfo['details'] % access_param})
126 else:
127 LOG.warning(errmsg, access_param)
128 return False
130 valid = True
131 valid &= validate(
132 'type', supported_access_types,
133 {'type': exception.InvalidShareAccess, 'about': "reason",
134 'details': _(
135 "%(access_type)s; only %(supported)s access type is allowed")})
136 valid &= validate(
137 'level', supported_access_levels,
138 {'type': exception.InvalidShareAccessLevel, 'about': "level",
139 'details': "%(access_level)s"})
141 return valid
144def fixup_access_rule(access_rule):
145 """Adjust access rule as required for ganesha to handle it properly.
147 :param access_rule: Access rules to be fixed up.
148 :return: access_rule
149 """
150 if access_rule['access_to'] == '0.0.0.0/0':
151 access_rule['access_to'] = '0.0.0.0' # nosec B104
152 LOG.debug("Set access_to field to '0.0.0.0' in ganesha back end.")
154 return access_rule