Coverage for manila/share/utils.py: 99%
68 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2012 OpenStack Foundation
2# Copyright (c) 2015 Rushil Chugh
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
17"""Share-related Utilities and helpers."""
19from oslo_config import cfg
21from manila.common import constants
22from manila.db import migration
23from manila import rpc
24from manila import utils
26DEFAULT_POOL_NAME = '_pool0'
27CONF = cfg.CONF
30def extract_host(host, level='backend', use_default_pool_name=False):
31 """Extract Host, Backend or Pool information from host string.
33 :param host: String for host, which could include host@backend#pool info
34 :param level: Indicate which level of information should be extracted
35 from host string. Level can be 'host', 'backend', 'pool',
36 or 'backend_name', default value is 'backend'
37 :param use_default_pool_name: This flag specifies what to do
38 if level == 'pool' and there is no 'pool' info
39 encoded in host string. default_pool_name=True
40 will return DEFAULT_POOL_NAME, otherwise it will
41 return None. Default value of this parameter
42 is False.
43 :return: expected level of information
45 For example:
46 host = 'HostA@BackendB#PoolC'
47 ret = extract_host(host, 'host')
48 # ret is 'HostA'
49 ret = extract_host(host, 'backend')
50 # ret is 'HostA@BackendB'
51 ret = extract_host(host, 'pool')
52 # ret is 'PoolC'
53 ret = extract_host(host, 'backend_name')
54 # ret is 'BackendB'
55 host = 'HostX@BackendY'
56 ret = extract_host(host, 'pool')
57 # ret is None
58 ret = extract_host(host, 'pool', True)
59 # ret is '_pool0'
60 """
61 if level == 'host':
62 # Make sure pool is not included
63 hst = host.split('#')[0]
64 return hst.split('@')[0]
65 if level == 'backend_name':
66 hst = host.split('#')[0]
67 return hst.split('@')[1]
68 elif level == 'backend':
69 return host.split('#')[0]
70 elif level == 'pool': 70 ↛ exitline 70 didn't return from function 'extract_host' because the condition on line 70 was always true
71 lst = host.split('#')
72 if len(lst) == 2:
73 return lst[1]
74 elif use_default_pool_name is True:
75 return DEFAULT_POOL_NAME
76 else:
77 return None
80def append_host(host, pool):
81 """Encode pool into host info."""
82 if not host or not pool:
83 return host
85 new_host = "#".join([host, pool])
86 return new_host
89def get_active_replica(replica_list):
90 """Returns the first 'active' replica in the list of replicas provided."""
91 for replica in replica_list:
92 if replica['replica_state'] == constants.REPLICA_STATE_ACTIVE:
93 return replica
96def change_rules_to_readonly(access_rules, add_rules, delete_rules):
97 dict_access_rules = cast_access_object_to_dict_in_readonly(access_rules)
98 dict_add_rules = cast_access_object_to_dict_in_readonly(add_rules)
99 dict_delete_rules = cast_access_object_to_dict_in_readonly(delete_rules)
100 return dict_access_rules, dict_add_rules, dict_delete_rules
103def cast_access_object_to_dict_in_readonly(rules):
104 dict_rules = []
105 for rule in rules:
106 dict_rules.append({
107 'access_level': constants.ACCESS_LEVEL_RO,
108 'access_type': rule['access_type'],
109 'access_to': rule['access_to']
110 })
111 return dict_rules
114@utils.if_notifications_enabled
115def notify_about_share_usage(context, share, share_instance,
116 event_suffix, extra_usage_info=None, host=None):
118 if not host:
119 host = CONF.host
121 if not extra_usage_info:
122 extra_usage_info = {}
124 usage_info = _usage_from_share(share, share_instance, **extra_usage_info)
126 rpc.get_notifier("share", host).info(context, 'share.%s' % event_suffix,
127 usage_info)
130def _usage_from_share(share_ref, share_instance_ref, **extra_usage_info):
132 usage_info = {
133 'share_id': share_ref['id'],
134 'user_id': share_ref['user_id'],
135 'project_id': share_ref['project_id'],
136 'snapshot_id': share_ref['snapshot_id'],
137 'share_group_id': share_ref['share_group_id'],
138 'size': share_ref['size'],
139 'name': share_ref['display_name'],
140 'description': share_ref['display_description'],
141 'proto': share_ref['share_proto'],
142 'is_public': share_ref['is_public'],
143 'availability_zone': share_instance_ref['availability_zone'],
144 'host': share_instance_ref['host'],
145 'status': share_instance_ref['status'],
146 'share_type_id': share_instance_ref['share_type_id'],
147 'share_type': share_instance_ref['share_type']['name'],
148 }
150 usage_info.update(extra_usage_info)
152 return usage_info
155def get_recent_db_migration_id():
156 return migration.version()
159def is_az_subnets_compatible(subnet_list, new_subnet_list):
160 if len(subnet_list) != len(new_subnet_list):
161 return False
163 for subnet in subnet_list:
164 found_compatible = False
165 for new_subnet in new_subnet_list:
166 if (subnet.get('neutron_net_id') ==
167 new_subnet.get('neutron_net_id') and
168 subnet.get('neutron_subnet_id') ==
169 new_subnet.get('neutron_subnet_id')):
170 found_compatible = True
171 break
172 if not found_compatible:
173 return False
175 return True