Coverage for manila/policy.py: 76%

86 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2011 OpenStack, LLC. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""Policy Engine For Manila""" 

17 

18import functools 

19import sys 

20 

21from oslo_config import cfg 

22from oslo_log import log as logging 

23from oslo_policy import policy 

24from oslo_utils import excutils 

25 

26from manila import exception 

27from manila import policies 

28 

29CONF = cfg.CONF 

30LOG = logging.getLogger(__name__) 

31_ENFORCER = None 

32 

33 

34def reset(): 

35 global _ENFORCER 

36 if _ENFORCER: 

37 _ENFORCER.clear() 

38 _ENFORCER = None 

39 

40 

41def init(rules=None, use_conf=True, suppress_deprecation_warnings=False): 

42 """Init an Enforcer class. 

43 

44 :param policy_file: Custom policy file to use, if none is specified, 

45 `CONF.policy_file` will be used. 

46 :param rules: Default dictionary / Rules to use. It will be 

47 considered just in the first instantiation. 

48 :param use_conf: Whether to load rules from config file. 

49 :param suppress_deprecation_warnings: Whether to suppress policy 

50 deprecation warnings. 

51 """ 

52 

53 global _ENFORCER 

54 if not _ENFORCER: 

55 _ENFORCER = policy.Enforcer(CONF, 

56 rules=rules, 

57 use_conf=use_conf) 

58 

59 # NOTE(gouthamr): Explicitly disable the warnings for policies 

60 # changing their default check_str. During 

61 # secure-rbac / policy-defaults-refresh work, all the policy 

62 # defaults have been changed and warning for each policy started 

63 # filling the log limits for various tools. Once we move to new 

64 # defaults only world then we can enable these warning again. 

65 _ENFORCER.suppress_default_change_warnings = True 

66 # Suppressing deprecation warnings is fine for tests. However we 

67 # won't do it by default 

68 _ENFORCER.suppress_deprecation_warnings = suppress_deprecation_warnings 

69 

70 register_rules(_ENFORCER) 

71 

72 

73def enforce(context, action, target, do_raise=True): 

74 """Verifies that the action is valid on the target in this context. 

75 

76 **IMPORTANT** ONLY for use in API extensions. This method ignores 

77 unregistered rules and applies a default rule on them; there should 

78 be no unregistered rules in first party manila APIs. 

79 

80 :param context: manila context 

81 :param action: string representing the action to be checked, 

82 this should be colon separated for clarity. 

83 i.e. ``share:create``, 

84 :param target: dictionary representing the object of the action 

85 for object creation, this should be a dictionary representing the 

86 location of the object e.g. ``{'project_id': context.project_id}`` 

87 :param do_raise: Whether to raise an exception if check fails. 

88 

89 :returns: When ``do_raise`` is ``False``, returns a value that 

90 evaluates as ``True`` or ``False`` depending on whether 

91 the policy allows action on the target. 

92 

93 :raises: manila.exception.PolicyNotAuthorized if verification fails 

94 and ``do_raise`` is ``True``. 

95 

96 """ 

97 init() 

98 

99 try: 

100 return _ENFORCER.enforce(action, 

101 target, 

102 context, 

103 do_raise=do_raise, 

104 exc=exception.PolicyNotAuthorized, 

105 action=action) 

106 except policy.InvalidScope: 

107 raise exception.PolicyNotAuthorized(action=action) 

108 

109 

110def set_rules(rules, overwrite=True, use_conf=False): 

111 """Set rules based on the provided dict of rules. 

112 

113 :param rules: New rules to use. It should be an instance of dict. 

114 :param overwrite: Whether to overwrite current rules or update them 

115 with the new rules. 

116 :param use_conf: Whether to reload rules from config file. 

117 """ 

118 

119 init(use_conf=False) 

120 _ENFORCER.set_rules(rules, overwrite, use_conf) 

121 

122 

123def get_rules(): 

124 if _ENFORCER: 

125 return _ENFORCER.rules 

126 

127 

128def register_rules(enforcer): 

129 enforcer.register_defaults(policies.list_rules()) 

130 

131 

132def get_enforcer(): 

133 # This method is for use by oslopolicy CLI scripts. Those scripts need the 

134 # 'output-file' and 'namespace' options, but having those in sys.argv means 

135 # loading the Manila config options will fail as those are not expected to 

136 # be present. So we pass in an arg list with those stripped out. 

137 conf_args = [] 

138 # Start at 1 because cfg.CONF expects the equivalent of sys.argv[1:] 

139 i = 1 

140 while i < len(sys.argv): 

141 if sys.argv[i].strip('-') in ['namespace', 'output-file']: 

142 i += 2 

143 continue 

144 conf_args.append(sys.argv[i]) 

145 i += 1 

146 

147 cfg.CONF(conf_args, project='manila') 

148 init() 

149 return _ENFORCER 

150 

151 

152def authorize(context, action, target, do_raise=True, exc=None): 

153 """Verifies that the action is valid on the target in this context. 

154 

155 :param context: manila context 

156 :param action: string representing the action to be checked 

157 this should be colon separated for clarity. 

158 i.e. ``share:create``, 

159 :param target: dictionary representing the object of the action 

160 for object creation this should be a dictionary representing the 

161 location of the object e.g. ``{'project_id': context.project_id}`` 

162 :param do_raise: if True (the default), raises PolicyNotAuthorized; 

163 if False, returns False 

164 :param exc: Class of the exception to raise if the check fails. 

165 Any remaining arguments passed to :meth:`authorize` (both 

166 positional and keyword arguments) will be passed to 

167 the exception class. If not specified, 

168 :class:`PolicyNotAuthorized` will be used. 

169 

170 :raises manila.exception.PolicyNotAuthorized: if verification fails 

171 and do_raise is True. Or if 'exc' is specified it will raise an 

172 exception of that type. 

173 

174 :return: returns a non-False value (not necessarily "True") if 

175 authorized, and the exact value False if not authorized and 

176 do_raise is False. 

177 """ 

178 init() 

179 if not exc: 179 ↛ 181line 179 didn't jump to line 181 because the condition on line 179 was always true

180 exc = exception.PolicyNotAuthorized 

181 target = target or default_target(context) 

182 

183 try: 

184 result = _ENFORCER.authorize(action, target, context, 

185 do_raise=do_raise, exc=exc, action=action) 

186 except policy.PolicyNotRegistered: 

187 with excutils.save_and_reraise_exception(): 

188 LOG.exception('Policy not registered') 

189 except policy.InvalidScope: 

190 if do_raise: 190 ↛ 193line 190 didn't jump to line 193 because the condition on line 190 was always true

191 raise exception.PolicyNotAuthorized(action=action) 

192 else: 

193 return False 

194 except Exception: 

195 with excutils.save_and_reraise_exception(): 

196 msg_args = { 

197 'action': action, 

198 'credentials': context.to_policy_values(), 

199 } 

200 LOG.debug('Policy check for %(action)s failed with credentials ' 

201 '%(credentials)s', msg_args) 

202 return result 

203 

204 

205def default_target(context): 

206 return {'project_id': context.project_id, 'user_id': context.user_id} 

207 

208 

209def check_is_admin(context): 

210 """Whether or not user is admin according to policy setting. 

211 

212 """ 

213 # the target is user-self 

214 target = default_target(context) 

215 return authorize(context, 'context_is_admin', target, do_raise=False) 

216 

217 

218def check_is_host_admin(context): 

219 """Whether or not user is host admin according to policy setting. 

220 

221 """ 

222 # the target is user-self 

223 target = default_target(context) 

224 return authorize(context, 'context_is_host_admin', target, do_raise=False) 

225 

226 

227def wrap_check_policy(resource): 

228 """Check policy corresponding to the wrapped methods prior to execution.""" 

229 def check_policy_wraper(func): 

230 @functools.wraps(func) 

231 def wrapped(self, context, target_obj, *args, **kwargs): 

232 check_policy(context, resource, func.__name__, target_obj) 

233 return func(self, context, target_obj, *args, **kwargs) 

234 

235 return wrapped 

236 return check_policy_wraper 

237 

238 

239def check_policy(context, resource, action, target_obj=None, do_raise=True): 

240 target = target_obj or default_target(context) 

241 _action = '%s:%s' % (resource, action) 

242 return authorize(context, _action, target, do_raise=do_raise)