Coverage for manila/context.py: 93%
68 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2011 OpenStack LLC.
2# Copyright 2010 United States Government as represented by the
3# Administrator of the National Aeronautics and Space Administration.
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
18"""RequestContext: context for requests that persist through all of manila."""
20import copy
22from oslo_context import context
23from oslo_db.sqlalchemy import enginefacade
24from oslo_utils import timeutils
26from manila.i18n import _
27from manila import policy
30@enginefacade.transaction_context_provider
31class RequestContext(context.RequestContext):
32 """Security context and request information.
34 Represents the user taking a given action within the system.
36 """
38 def __init__(self, user_id=None, project_id=None, is_admin=None,
39 read_deleted="no", project_name=None, remote_address=None,
40 timestamp=None, quota_class=None, service_catalog=None,
41 **kwargs):
42 """Initialize RequestContext.
44 :param read_deleted: 'no' indicates deleted records are hidden, 'yes'
45 indicates deleted records are visible, 'only' indicates that
46 *only* deleted records are visible.
48 :param kwargs: Extra arguments passed transparently to
49 oslo_context.RequestContext.
50 """
51 kwargs.setdefault('user_id', user_id)
52 kwargs.setdefault('project_id', project_id)
54 super().__init__(is_admin=is_admin, **kwargs)
56 self.project_name = project_name
57 if self.is_admin is None:
58 self.is_admin = policy.check_is_admin(self)
59 elif self.is_admin and 'admin' not in self.roles:
60 self.roles.append('admin')
61 # a "service" user's token will contain "service_roles"
62 self.is_service = kwargs.get('service_roles') or False
63 self.read_deleted = read_deleted
64 self.remote_address = remote_address
65 if not timestamp:
66 timestamp = timeutils.utcnow()
67 elif isinstance(timestamp, str): 67 ↛ 69line 67 didn't jump to line 69 because the condition on line 67 was always true
68 timestamp = timeutils.parse_isotime(timestamp)
69 self.timestamp = timestamp
70 self.quota_class = quota_class
71 if service_catalog: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true
72 self.service_catalog = [s for s in service_catalog
73 if s.get('type') in ('compute', 'volume')]
74 else:
75 self.service_catalog = []
77 def _get_read_deleted(self):
78 return self._read_deleted
80 def _set_read_deleted(self, read_deleted):
81 if read_deleted not in ('no', 'yes', 'only'):
82 raise ValueError(_("read_deleted can only be one of 'no', "
83 "'yes' or 'only', not %r") % read_deleted)
84 self._read_deleted = read_deleted
86 def _del_read_deleted(self):
87 del self._read_deleted
89 read_deleted = property(_get_read_deleted, _set_read_deleted,
90 _del_read_deleted)
92 def to_dict(self):
93 values = super().to_dict()
94 values['user_id'] = self.user_id
95 values['project_id'] = self.project_id
96 values['project_name'] = self.project_name
97 values['domain_id'] = self.domain_id
98 values['read_deleted'] = self.read_deleted
99 values['remote_address'] = self.remote_address
100 values['timestamp'] = self.timestamp.isoformat()
101 values['quota_class'] = self.quota_class
102 values['service_catalog'] = self.service_catalog
103 values['request_id'] = self.request_id
104 return values
106 @classmethod
107 def from_dict(cls, values):
108 return super().from_dict(
109 values,
110 user_id=values.get('user_id'),
111 project_id=values.get('project_id'),
112 project_name=values.get('project_name'),
113 domain_id=values.get('domain_id'),
114 read_deleted=values.get('read_deleted', 'no'),
115 remote_address=values.get('remote_address'),
116 timestamp=values.get('timestamp'),
117 quota_class=values.get('quota_class'),
118 service_catalog=values.get('service_catalog'),
119 request_id=values.get('request_id'),
120 is_admin=values.get('is_admin'),
121 roles=values.get('roles'),
122 auth_token=values.get('auth_token'),
123 user_domain_id=values.get('user_domain_id'),
124 project_domain_id=values.get('project_domain_id')
125 )
127 def elevated(self, read_deleted=None, overwrite=False):
128 """Return a version of this context with admin flag set."""
129 ctx = copy.deepcopy(self)
130 ctx.is_admin = True
132 if 'admin' not in ctx.roles:
133 ctx.roles.append('admin')
135 if read_deleted is not None: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true
136 ctx.read_deleted = read_deleted
138 return ctx
140 def to_policy_values(self):
141 policy = super(RequestContext, self).to_policy_values()
142 policy['is_admin'] = self.is_admin
143 return policy
146def get_admin_context(read_deleted="no"):
147 return RequestContext(user_id=None,
148 project_id=None,
149 is_admin=True,
150 read_deleted=read_deleted,
151 overwrite=False)