Coverage for manila/policy.py: 76%
86 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2011 OpenStack, LLC.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16"""Policy Engine For Manila"""
18import functools
19import sys
21from oslo_config import cfg
22from oslo_log import log as logging
23from oslo_policy import policy
24from oslo_utils import excutils
26from manila import exception
27from manila import policies
29CONF = cfg.CONF
30LOG = logging.getLogger(__name__)
31_ENFORCER = None
34def reset():
35 global _ENFORCER
36 if _ENFORCER:
37 _ENFORCER.clear()
38 _ENFORCER = None
41def init(rules=None, use_conf=True, suppress_deprecation_warnings=False):
42 """Init an Enforcer class.
44 :param policy_file: Custom policy file to use, if none is specified,
45 `CONF.policy_file` will be used.
46 :param rules: Default dictionary / Rules to use. It will be
47 considered just in the first instantiation.
48 :param use_conf: Whether to load rules from config file.
49 :param suppress_deprecation_warnings: Whether to suppress policy
50 deprecation warnings.
51 """
53 global _ENFORCER
54 if not _ENFORCER:
55 _ENFORCER = policy.Enforcer(CONF,
56 rules=rules,
57 use_conf=use_conf)
59 # NOTE(gouthamr): Explicitly disable the warnings for policies
60 # changing their default check_str. During
61 # secure-rbac / policy-defaults-refresh work, all the policy
62 # defaults have been changed and warning for each policy started
63 # filling the log limits for various tools. Once we move to new
64 # defaults only world then we can enable these warning again.
65 _ENFORCER.suppress_default_change_warnings = True
66 # Suppressing deprecation warnings is fine for tests. However we
67 # won't do it by default
68 _ENFORCER.suppress_deprecation_warnings = suppress_deprecation_warnings
70 register_rules(_ENFORCER)
73def enforce(context, action, target, do_raise=True):
74 """Verifies that the action is valid on the target in this context.
76 **IMPORTANT** ONLY for use in API extensions. This method ignores
77 unregistered rules and applies a default rule on them; there should
78 be no unregistered rules in first party manila APIs.
80 :param context: manila context
81 :param action: string representing the action to be checked,
82 this should be colon separated for clarity.
83 i.e. ``share:create``,
84 :param target: dictionary representing the object of the action
85 for object creation, this should be a dictionary representing the
86 location of the object e.g. ``{'project_id': context.project_id}``
87 :param do_raise: Whether to raise an exception if check fails.
89 :returns: When ``do_raise`` is ``False``, returns a value that
90 evaluates as ``True`` or ``False`` depending on whether
91 the policy allows action on the target.
93 :raises: manila.exception.PolicyNotAuthorized if verification fails
94 and ``do_raise`` is ``True``.
96 """
97 init()
99 try:
100 return _ENFORCER.enforce(action,
101 target,
102 context,
103 do_raise=do_raise,
104 exc=exception.PolicyNotAuthorized,
105 action=action)
106 except policy.InvalidScope:
107 raise exception.PolicyNotAuthorized(action=action)
110def set_rules(rules, overwrite=True, use_conf=False):
111 """Set rules based on the provided dict of rules.
113 :param rules: New rules to use. It should be an instance of dict.
114 :param overwrite: Whether to overwrite current rules or update them
115 with the new rules.
116 :param use_conf: Whether to reload rules from config file.
117 """
119 init(use_conf=False)
120 _ENFORCER.set_rules(rules, overwrite, use_conf)
123def get_rules():
124 if _ENFORCER:
125 return _ENFORCER.rules
128def register_rules(enforcer):
129 enforcer.register_defaults(policies.list_rules())
132def get_enforcer():
133 # This method is for use by oslopolicy CLI scripts. Those scripts need the
134 # 'output-file' and 'namespace' options, but having those in sys.argv means
135 # loading the Manila config options will fail as those are not expected to
136 # be present. So we pass in an arg list with those stripped out.
137 conf_args = []
138 # Start at 1 because cfg.CONF expects the equivalent of sys.argv[1:]
139 i = 1
140 while i < len(sys.argv):
141 if sys.argv[i].strip('-') in ['namespace', 'output-file']:
142 i += 2
143 continue
144 conf_args.append(sys.argv[i])
145 i += 1
147 cfg.CONF(conf_args, project='manila')
148 init()
149 return _ENFORCER
152def authorize(context, action, target, do_raise=True, exc=None):
153 """Verifies that the action is valid on the target in this context.
155 :param context: manila context
156 :param action: string representing the action to be checked
157 this should be colon separated for clarity.
158 i.e. ``share:create``,
159 :param target: dictionary representing the object of the action
160 for object creation this should be a dictionary representing the
161 location of the object e.g. ``{'project_id': context.project_id}``
162 :param do_raise: if True (the default), raises PolicyNotAuthorized;
163 if False, returns False
164 :param exc: Class of the exception to raise if the check fails.
165 Any remaining arguments passed to :meth:`authorize` (both
166 positional and keyword arguments) will be passed to
167 the exception class. If not specified,
168 :class:`PolicyNotAuthorized` will be used.
170 :raises manila.exception.PolicyNotAuthorized: if verification fails
171 and do_raise is True. Or if 'exc' is specified it will raise an
172 exception of that type.
174 :return: returns a non-False value (not necessarily "True") if
175 authorized, and the exact value False if not authorized and
176 do_raise is False.
177 """
178 init()
179 if not exc: 179 ↛ 181line 179 didn't jump to line 181 because the condition on line 179 was always true
180 exc = exception.PolicyNotAuthorized
181 target = target or default_target(context)
183 try:
184 result = _ENFORCER.authorize(action, target, context,
185 do_raise=do_raise, exc=exc, action=action)
186 except policy.PolicyNotRegistered:
187 with excutils.save_and_reraise_exception():
188 LOG.exception('Policy not registered')
189 except policy.InvalidScope:
190 if do_raise: 190 ↛ 193line 190 didn't jump to line 193 because the condition on line 190 was always true
191 raise exception.PolicyNotAuthorized(action=action)
192 else:
193 return False
194 except Exception:
195 with excutils.save_and_reraise_exception():
196 msg_args = {
197 'action': action,
198 'credentials': context.to_policy_values(),
199 }
200 LOG.debug('Policy check for %(action)s failed with credentials '
201 '%(credentials)s', msg_args)
202 return result
205def default_target(context):
206 return {'project_id': context.project_id, 'user_id': context.user_id}
209def check_is_admin(context):
210 """Whether or not user is admin according to policy setting.
212 """
213 # the target is user-self
214 target = default_target(context)
215 return authorize(context, 'context_is_admin', target, do_raise=False)
218def check_is_host_admin(context):
219 """Whether or not user is host admin according to policy setting.
221 """
222 # the target is user-self
223 target = default_target(context)
224 return authorize(context, 'context_is_host_admin', target, do_raise=False)
227def wrap_check_policy(resource):
228 """Check policy corresponding to the wrapped methods prior to execution."""
229 def check_policy_wraper(func):
230 @functools.wraps(func)
231 def wrapped(self, context, target_obj, *args, **kwargs):
232 check_policy(context, resource, func.__name__, target_obj)
233 return func(self, context, target_obj, *args, **kwargs)
235 return wrapped
236 return check_policy_wraper
239def check_policy(context, resource, action, target_obj=None, do_raise=True):
240 target = target_obj or default_target(context)
241 _action = '%s:%s' % (resource, action)
242 return authorize(context, _action, target, do_raise=do_raise)