Coverage for manila/tests/test_policy.py: 100%
121 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2011 Piston Cloud Computing, Inc.
2# All Rights Reserved.
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16"""Test of Policy Engine For Manila."""
18import ddt
19from oslo_config import cfg
20from oslo_policy import policy as common_policy
22from manila import context
23from manila import exception
24from manila import policy
25from manila import test
27CONF = cfg.CONF
30@ddt.ddt
31class PolicyTestCase(test.TestCase):
32 def setUp(self):
33 super(PolicyTestCase, self).setUp()
34 rules = [
35 common_policy.RuleDefault("true", '@'),
36 common_policy.RuleDefault("test:allowed", '@'),
37 common_policy.RuleDefault("test:denied", "!"),
38 common_policy.RuleDefault("test:my_file",
39 "role:compute_admin or "
40 "project_id:%(project_id)s"),
41 common_policy.RuleDefault("test:early_and_fail", "! and @"),
42 common_policy.RuleDefault("test:early_or_success", "@ or !"),
43 common_policy.RuleDefault("test:lowercase_admin",
44 "role:admin"),
45 common_policy.RuleDefault("test:uppercase_admin",
46 "role:ADMIN"),
47 ]
48 policy.reset()
49 policy.init(suppress_deprecation_warnings=True)
50 # before a policy rule can be used, its default has to be registered.
51 policy._ENFORCER.register_defaults(rules)
52 self.context = context.RequestContext('fake', 'fake', roles=['member'])
53 self.target = {}
54 self.addCleanup(policy.reset)
56 def test_authorize_nonexistent_action_throws(self):
57 action = "test:noexist"
58 self.assertRaises(common_policy.PolicyNotRegistered, policy.authorize,
59 self.context, action, self.target)
61 def test_authorize_bad_action_throws(self):
62 action = "test:denied"
63 self.assertRaises(exception.PolicyNotAuthorized, policy.authorize,
64 self.context, action, self.target)
66 def test_authorize_bad_action_noraise(self):
67 action = "test:denied"
68 result = policy.authorize(self.context, action, self.target, False)
69 self.assertFalse(result)
71 def test_authorize_good_action(self):
72 action = "test:allowed"
73 result = policy.authorize(self.context, action, self.target)
74 self.assertTrue(result)
76 def test_templatized_authorization(self):
77 target_mine = {'project_id': 'fake'}
78 target_not_mine = {'project_id': 'another'}
79 action = "test:my_file"
80 policy.authorize(self.context, action, target_mine)
81 self.assertRaises(exception.PolicyNotAuthorized, policy.authorize,
82 self.context, action, target_not_mine)
84 def test_early_AND_authorization(self):
85 action = "test:early_and_fail"
86 self.assertRaises(exception.PolicyNotAuthorized, policy.authorize,
87 self.context, action, self.target)
89 def test_early_OR_authorization(self):
90 action = "test:early_or_success"
91 policy.authorize(self.context, action, self.target)
93 def test_ignore_case_role_check(self):
94 lowercase_action = "test:lowercase_admin"
95 uppercase_action = "test:uppercase_admin"
96 admin_context = context.RequestContext('admin',
97 'fake',
98 roles=['AdMiN'])
99 policy.authorize(admin_context, lowercase_action, self.target)
100 policy.authorize(admin_context, uppercase_action, self.target)
102 @ddt.data('enforce', 'authorize')
103 def test_authorize_properly_handles_invalid_scope_exception(self, method):
104 self.fixture.config(enforce_scope=True, group='oslo_policy')
105 project_context = context.RequestContext(project_id='fake-project-id',
106 roles=['bar'])
107 policy.reset()
108 policy.init(suppress_deprecation_warnings=True)
109 rule = common_policy.RuleDefault('foo', 'role:bar',
110 scope_types=['system'])
111 policy._ENFORCER.register_defaults([rule])
113 self.assertRaises(exception.PolicyNotAuthorized,
114 getattr(policy, method),
115 project_context, 'foo', {})
117 @ddt.data('enforce', 'authorize')
118 def test_authorize_does_not_raise_forbidden(self, method):
119 self.fixture.config(enforce_scope=False, group='oslo_policy')
120 project_context = context.RequestContext(project_id='fake-project-id',
121 roles=['bar'])
122 policy.reset()
123 policy.init(suppress_deprecation_warnings=True)
124 rule = common_policy.RuleDefault('foo', 'role:bar',
125 scope_types=['system'])
126 policy._ENFORCER.register_defaults([rule])
128 self.assertTrue(getattr(policy, method)(project_context, 'foo', {}))
131class DefaultPolicyTestCase(test.TestCase):
132 """This test case calls into the "enforce" method in policy
134 enforce() in contrast with authorize() allows "default" rules to apply
135 to policies that have not been registered.
136 """
138 def setUp(self):
139 super(DefaultPolicyTestCase, self).setUp()
140 policy.reset()
141 policy.init(suppress_deprecation_warnings=True)
143 self.rules = {
144 "default": [],
145 "example:exist": "false:false"
146 }
147 self._set_rules('default')
148 self.context = context.RequestContext('fake', 'fake')
150 def tearDown(self):
151 super(DefaultPolicyTestCase, self).tearDown()
152 policy.reset()
154 def _set_rules(self, default_rule):
155 these_rules = common_policy.Rules.from_dict(self.rules,
156 default_rule=default_rule)
157 policy._ENFORCER.set_rules(these_rules)
159 def test_policy_called(self):
160 self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
161 self.context, "example:exist", {})
163 def test_not_found_policy_calls_default(self):
164 policy.enforce(self.context, "example:noexist", {})
166 def test_default_not_found(self):
167 new_default_rule = "default_noexist"
168 # FIXME(gyee): need to overwrite the Enforcer's default_rule first
169 # as it is recreating the rules with its own default_rule instead
170 # of the default_rule passed in from set_rules(). I think this is a
171 # bug in Oslo policy.
172 policy._ENFORCER.default_rule = new_default_rule
173 self._set_rules(new_default_rule)
174 self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
175 self.context, "example:noexist", {})
178class ContextIsAdminPolicyTestCase(test.TestCase):
180 def setUp(self):
181 super(ContextIsAdminPolicyTestCase, self).setUp()
182 policy.reset()
183 policy.init(suppress_deprecation_warnings=True)
185 def _set_rules(self, rules, default_rule):
186 these_rules = common_policy.Rules.from_dict(rules,
187 default_rule=default_rule)
188 policy._ENFORCER.set_rules(these_rules)
190 def test_default_admin_role_is_admin(self):
191 ctx = context.RequestContext('fake', 'fake', roles=['johnny-admin'])
192 self.assertFalse(ctx.is_admin)
193 ctx = context.RequestContext('fake', 'fake', roles=['admin'])
194 self.assertTrue(ctx.is_admin)
196 def test_custom_admin_role_is_admin(self):
197 # define explicit rules for context_is_admin
198 rules = {
199 'context_is_admin': [["role:administrator"], ["role:johnny-admin"]]
200 }
201 self._set_rules(rules, CONF.oslo_policy.policy_default_rule)
202 ctx = context.RequestContext('fake', 'fake', roles=['johnny-admin'])
203 self.assertTrue(ctx.is_admin)
204 ctx = context.RequestContext('fake', 'fake', roles=['administrator'])
205 self.assertTrue(ctx.is_admin)
206 # default rule no longer applies
207 ctx = context.RequestContext('fake', 'fake', roles=['admin'])
208 self.assertFalse(ctx.is_admin)
210 def test_context_is_admin_undefined(self):
211 rules = {
212 "admin_or_owner": "role:admin or project_id:%(project_id)s",
213 "default": "rule:admin_or_owner",
214 }
215 self._set_rules(rules, CONF.oslo_policy.policy_default_rule)
216 ctx = context.RequestContext('fake', 'fake')
217 self.assertTrue(ctx.is_admin)
218 ctx = context.RequestContext('fake', 'fake', roles=['admin'])
219 self.assertTrue(ctx.is_admin)