Coverage for manila/tests/test_policy.py: 100%

121 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2011 Piston Cloud Computing, Inc. 

2# All Rights Reserved. 

3 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""Test of Policy Engine For Manila.""" 

17 

18import ddt 

19from oslo_config import cfg 

20from oslo_policy import policy as common_policy 

21 

22from manila import context 

23from manila import exception 

24from manila import policy 

25from manila import test 

26 

27CONF = cfg.CONF 

28 

29 

30@ddt.ddt 

31class PolicyTestCase(test.TestCase): 

32 def setUp(self): 

33 super(PolicyTestCase, self).setUp() 

34 rules = [ 

35 common_policy.RuleDefault("true", '@'), 

36 common_policy.RuleDefault("test:allowed", '@'), 

37 common_policy.RuleDefault("test:denied", "!"), 

38 common_policy.RuleDefault("test:my_file", 

39 "role:compute_admin or " 

40 "project_id:%(project_id)s"), 

41 common_policy.RuleDefault("test:early_and_fail", "! and @"), 

42 common_policy.RuleDefault("test:early_or_success", "@ or !"), 

43 common_policy.RuleDefault("test:lowercase_admin", 

44 "role:admin"), 

45 common_policy.RuleDefault("test:uppercase_admin", 

46 "role:ADMIN"), 

47 ] 

48 policy.reset() 

49 policy.init(suppress_deprecation_warnings=True) 

50 # before a policy rule can be used, its default has to be registered. 

51 policy._ENFORCER.register_defaults(rules) 

52 self.context = context.RequestContext('fake', 'fake', roles=['member']) 

53 self.target = {} 

54 self.addCleanup(policy.reset) 

55 

56 def test_authorize_nonexistent_action_throws(self): 

57 action = "test:noexist" 

58 self.assertRaises(common_policy.PolicyNotRegistered, policy.authorize, 

59 self.context, action, self.target) 

60 

61 def test_authorize_bad_action_throws(self): 

62 action = "test:denied" 

63 self.assertRaises(exception.PolicyNotAuthorized, policy.authorize, 

64 self.context, action, self.target) 

65 

66 def test_authorize_bad_action_noraise(self): 

67 action = "test:denied" 

68 result = policy.authorize(self.context, action, self.target, False) 

69 self.assertFalse(result) 

70 

71 def test_authorize_good_action(self): 

72 action = "test:allowed" 

73 result = policy.authorize(self.context, action, self.target) 

74 self.assertTrue(result) 

75 

76 def test_templatized_authorization(self): 

77 target_mine = {'project_id': 'fake'} 

78 target_not_mine = {'project_id': 'another'} 

79 action = "test:my_file" 

80 policy.authorize(self.context, action, target_mine) 

81 self.assertRaises(exception.PolicyNotAuthorized, policy.authorize, 

82 self.context, action, target_not_mine) 

83 

84 def test_early_AND_authorization(self): 

85 action = "test:early_and_fail" 

86 self.assertRaises(exception.PolicyNotAuthorized, policy.authorize, 

87 self.context, action, self.target) 

88 

89 def test_early_OR_authorization(self): 

90 action = "test:early_or_success" 

91 policy.authorize(self.context, action, self.target) 

92 

93 def test_ignore_case_role_check(self): 

94 lowercase_action = "test:lowercase_admin" 

95 uppercase_action = "test:uppercase_admin" 

96 admin_context = context.RequestContext('admin', 

97 'fake', 

98 roles=['AdMiN']) 

99 policy.authorize(admin_context, lowercase_action, self.target) 

100 policy.authorize(admin_context, uppercase_action, self.target) 

101 

102 @ddt.data('enforce', 'authorize') 

103 def test_authorize_properly_handles_invalid_scope_exception(self, method): 

104 self.fixture.config(enforce_scope=True, group='oslo_policy') 

105 project_context = context.RequestContext(project_id='fake-project-id', 

106 roles=['bar']) 

107 policy.reset() 

108 policy.init(suppress_deprecation_warnings=True) 

109 rule = common_policy.RuleDefault('foo', 'role:bar', 

110 scope_types=['system']) 

111 policy._ENFORCER.register_defaults([rule]) 

112 

113 self.assertRaises(exception.PolicyNotAuthorized, 

114 getattr(policy, method), 

115 project_context, 'foo', {}) 

116 

117 @ddt.data('enforce', 'authorize') 

118 def test_authorize_does_not_raise_forbidden(self, method): 

119 self.fixture.config(enforce_scope=False, group='oslo_policy') 

120 project_context = context.RequestContext(project_id='fake-project-id', 

121 roles=['bar']) 

122 policy.reset() 

123 policy.init(suppress_deprecation_warnings=True) 

124 rule = common_policy.RuleDefault('foo', 'role:bar', 

125 scope_types=['system']) 

126 policy._ENFORCER.register_defaults([rule]) 

127 

128 self.assertTrue(getattr(policy, method)(project_context, 'foo', {})) 

129 

130 

131class DefaultPolicyTestCase(test.TestCase): 

132 """This test case calls into the "enforce" method in policy 

133 

134 enforce() in contrast with authorize() allows "default" rules to apply 

135 to policies that have not been registered. 

136 """ 

137 

138 def setUp(self): 

139 super(DefaultPolicyTestCase, self).setUp() 

140 policy.reset() 

141 policy.init(suppress_deprecation_warnings=True) 

142 

143 self.rules = { 

144 "default": [], 

145 "example:exist": "false:false" 

146 } 

147 self._set_rules('default') 

148 self.context = context.RequestContext('fake', 'fake') 

149 

150 def tearDown(self): 

151 super(DefaultPolicyTestCase, self).tearDown() 

152 policy.reset() 

153 

154 def _set_rules(self, default_rule): 

155 these_rules = common_policy.Rules.from_dict(self.rules, 

156 default_rule=default_rule) 

157 policy._ENFORCER.set_rules(these_rules) 

158 

159 def test_policy_called(self): 

160 self.assertRaises(exception.PolicyNotAuthorized, policy.enforce, 

161 self.context, "example:exist", {}) 

162 

163 def test_not_found_policy_calls_default(self): 

164 policy.enforce(self.context, "example:noexist", {}) 

165 

166 def test_default_not_found(self): 

167 new_default_rule = "default_noexist" 

168 # FIXME(gyee): need to overwrite the Enforcer's default_rule first 

169 # as it is recreating the rules with its own default_rule instead 

170 # of the default_rule passed in from set_rules(). I think this is a 

171 # bug in Oslo policy. 

172 policy._ENFORCER.default_rule = new_default_rule 

173 self._set_rules(new_default_rule) 

174 self.assertRaises(exception.PolicyNotAuthorized, policy.enforce, 

175 self.context, "example:noexist", {}) 

176 

177 

178class ContextIsAdminPolicyTestCase(test.TestCase): 

179 

180 def setUp(self): 

181 super(ContextIsAdminPolicyTestCase, self).setUp() 

182 policy.reset() 

183 policy.init(suppress_deprecation_warnings=True) 

184 

185 def _set_rules(self, rules, default_rule): 

186 these_rules = common_policy.Rules.from_dict(rules, 

187 default_rule=default_rule) 

188 policy._ENFORCER.set_rules(these_rules) 

189 

190 def test_default_admin_role_is_admin(self): 

191 ctx = context.RequestContext('fake', 'fake', roles=['johnny-admin']) 

192 self.assertFalse(ctx.is_admin) 

193 ctx = context.RequestContext('fake', 'fake', roles=['admin']) 

194 self.assertTrue(ctx.is_admin) 

195 

196 def test_custom_admin_role_is_admin(self): 

197 # define explicit rules for context_is_admin 

198 rules = { 

199 'context_is_admin': [["role:administrator"], ["role:johnny-admin"]] 

200 } 

201 self._set_rules(rules, CONF.oslo_policy.policy_default_rule) 

202 ctx = context.RequestContext('fake', 'fake', roles=['johnny-admin']) 

203 self.assertTrue(ctx.is_admin) 

204 ctx = context.RequestContext('fake', 'fake', roles=['administrator']) 

205 self.assertTrue(ctx.is_admin) 

206 # default rule no longer applies 

207 ctx = context.RequestContext('fake', 'fake', roles=['admin']) 

208 self.assertFalse(ctx.is_admin) 

209 

210 def test_context_is_admin_undefined(self): 

211 rules = { 

212 "admin_or_owner": "role:admin or project_id:%(project_id)s", 

213 "default": "rule:admin_or_owner", 

214 } 

215 self._set_rules(rules, CONF.oslo_policy.policy_default_rule) 

216 ctx = context.RequestContext('fake', 'fake') 

217 self.assertTrue(ctx.is_admin) 

218 ctx = context.RequestContext('fake', 'fake', roles=['admin']) 

219 self.assertTrue(ctx.is_admin)