Coverage for manila/keymgr/barbican.py: 59%
123 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2025 Cloudification GmbH.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import itertools
18from castellan.key_manager import barbican_key_manager
19from castellan import options as castellan_options
20from keystoneauth1 import loading as ks_loading
21from keystoneauth1 import session as ks_session
22from keystoneclient.v3 import client as ks_client
23from oslo_config import cfg
24from oslo_log import log as logging
25from oslo_utils import uuidutils
27from manila.common import client_auth
28from manila import exception
31BARBICAN_GROUP = 'barbican'
33CONF = cfg.CONF
34LOG = logging.getLogger(__name__)
36castellan_options.set_defaults(CONF)
37ks_loading.register_auth_conf_options(CONF, BARBICAN_GROUP)
40BARBICAN_OPTS = [
41 cfg.StrOpt('endpoint_type',
42 default='publicURL',
43 choices=['publicURL', 'internalURL', 'adminURL',
44 'public', 'internal', 'admin'],
45 help='Endpoint type to be used with keystone client calls.'),
46 cfg.StrOpt('region_name',
47 help='Region name for connecting to keystone for '
48 'application credential management.'),
49]
51CONF.register_opts(BARBICAN_OPTS, BARBICAN_GROUP)
54def list_opts():
55 # NOTE(tkajinam): This likely breaks when castellan fixes missing auth
56 # plugin options
57 return itertools.chain(
58 [(BARBICAN_GROUP, BARBICAN_OPTS)],
59 client_auth.AuthClientLoader.list_opts(BARBICAN_GROUP)
60 )
63def _require_barbican_key_manager_backend(conf):
64 backend = conf.key_manager.backend
65 if backend is None: 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true
66 LOG.warning("The BarbicanKeyManager backend should be explicitly "
67 "used for share encryption.")
68 raise exception.ManilaBarbicanACLError()
70 backend = backend.split('.')[-1]
71 if backend not in ('barbican', 'BarbicanKeyManager'):
72 LOG.warning("The '%s' key_manager backend is not supported. Please"
73 " use barbican as key_manager.", backend)
74 raise exception.ManilaBarbicanACLError()
77class BarbicanSecretACL(barbican_key_manager.BarbicanKeyManager):
79 def get_client_and_href(self, context, secret_ref):
80 """Get user barbican client and a secret href"""
81 _require_barbican_key_manager_backend(self.conf)
83 if not secret_ref:
84 LOG.error("Missing secret_ref provided in current user context.")
85 raise exception.ManilaBarbicanACLError()
87 # Establish a Barbican client session of current user and keystone
88 # session of barbican user to get its user_id. Grant ACL to barbican
89 # user that it will be used for the key_ref handover process.
90 try:
91 user_barbican_client, base_url = self._get_barbican_client(context)
92 secret_ref = self._create_secret_ref(base_url, secret_ref)
93 except Exception as e:
94 LOG.error("Failed to create barbican client. Error: %s", e)
95 raise exception.ManilaBarbicanACLError()
97 return user_barbican_client, secret_ref
99 def _get_barbican_user_id(self):
100 barbican_auth = ks_loading.load_auth_from_conf_options(
101 self.conf, BARBICAN_GROUP)
102 barbican_sess = ks_session.Session(auth=barbican_auth)
103 barbican_ks_client = ks_client.Client(
104 session=barbican_sess,
105 interface=self.conf.barbican.endpoint_type,
106 region_name=self.conf.barbican.region_name)
107 return barbican_ks_client.session.get_user_id()
109 def create_secret_access(self, context, secret_ref):
110 try:
111 user_barbican_client, secret_href = self.get_client_and_href(
112 context, secret_ref)
113 barbican_user_id = self._get_barbican_user_id()
114 # Create a Barbican ACL so the barbican user can access it.
115 acl = user_barbican_client.acls.create(entity_ref=secret_href,
116 users=[barbican_user_id],
117 project_access=False)
118 acl.submit()
119 except Exception as e:
120 LOG.error("Failed to create secret ACL. Error: %s", e)
121 raise exception.ManilaBarbicanACLError()
123 def delete_secret_access(self, context, secret_ref):
124 try:
125 user_barbican_client, secret_href = self.get_client_and_href(
126 context, secret_ref)
127 barbican_user_id = self._get_barbican_user_id()
129 # Remove a Barbican ACL for the barbican user.
130 acl_entity = user_barbican_client.acls.get(entity_ref=secret_href)
131 existing_users = acl_entity.read.users
132 remove_users = [barbican_user_id]
133 updated_users = set(existing_users).difference(remove_users)
134 acl_entity.read.users = list(updated_users)
136 acl_entity.submit()
137 except Exception as e:
138 LOG.error("Failed to delete secret ACL. Error: %s", e)
140 def get_secret_href(self, context, secret_ref):
141 try:
142 user_barbican_client, secret_href = self.get_client_and_href(
143 context, secret_ref)
144 return secret_href
145 except Exception as e:
146 LOG.error("Failed to get barbican secret href. Error: %s", e)
147 raise exception.ManilaBarbicanACLError()
150class BarbicanUserAppCreds(object):
151 def __init__(self, conf):
152 self.conf = conf
154 @property
155 def client(self):
156 return self.get_client()
158 def get_client(self):
159 _require_barbican_key_manager_backend(self.conf)
160 auth = ks_loading.load_auth_from_conf_options(self.conf,
161 BARBICAN_GROUP)
162 sess = ks_session.Session(auth=auth)
163 return ks_client.Client(
164 session=sess,
165 interface=self.conf.barbican.endpoint_type,
166 region_name=self.conf.barbican.region_name)
168 def get_application_credentials(self, context, application_credential_id):
169 if not application_credential_id:
170 LOG.warning("Missing application credentials ID")
171 raise exception.ManilaBarbicanAppCredsError()
173 try:
174 return self.client.application_credentials.get(
175 application_credential=application_credential_id)
176 except Exception as e:
177 LOG.error("Aborting App Creds request due to error: %s", e)
178 raise exception.ManilaBarbicanAppCredsError()
180 def create_application_credentials(self, context, secret):
181 try:
182 secrets_path = "/key-manager/v1/secrets"
183 return self.client.application_credentials.create(
184 name='manila_barbican_' + uuidutils.generate_uuid(),
185 user=self.client.session.get_user_id(),
186 roles=[{'name': 'service'}],
187 secret=str(secret),
188 access_rules=[
189 {
190 "path": secrets_path + "/%s" % secret,
191 "method": "GET",
192 "service": "key-manager",
193 },
194 {
195 "path": secrets_path + "/%s/payload" % secret,
196 "method": "GET",
197 "service": "key-manager",
198 }
199 ]
200 )
201 except Exception as e:
202 LOG.error("Aborting App Creds create due to error: %s", e)
203 raise exception.ManilaBarbicanAppCredsError()
205 def delete_application_credentials(self, context,
206 application_credential_id):
207 if not application_credential_id:
208 LOG.warning("Missing application credentials ID")
209 raise exception.ManilaBarbicanAppCredsError()
211 try:
212 return self.client.application_credentials.delete(
213 application_credential=application_credential_id)
214 except Exception as e:
215 LOG.error("Aborting App Creds request due to error: %s", e)
216 raise exception.ManilaBarbicanAppCredsError()
219def create_secret_access(context, secret_ref, conf=CONF):
220 BarbicanSecretACL(conf).create_secret_access(context, secret_ref)
223def delete_secret_access(context, secret_ref, conf=CONF):
224 BarbicanSecretACL(conf).delete_secret_access(context, secret_ref)
227def get_secret_href(context, secret_ref, conf=CONF):
228 return BarbicanSecretACL(conf).get_secret_href(context, secret_ref)
231def create_application_credentials(context, secret, conf=CONF):
232 return BarbicanUserAppCreds(conf).create_application_credentials(
233 context,
234 secret)
237def get_application_credentials(context, application_credential_id, conf=CONF):
238 return BarbicanUserAppCreds(conf).get_application_credentials(
239 context,
240 application_credential_id)
243def delete_application_credentials(context,
244 application_credential_id, conf=CONF):
245 BarbicanUserAppCreds(conf).delete_application_credentials(
246 context,
247 application_credential_id)