Coverage for manila/share/utils.py: 99%

68 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2012 OpenStack Foundation 

2# Copyright (c) 2015 Rushil Chugh 

3# All Rights Reserved. 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); you may 

6# not use this file except in compliance with the License. You may obtain 

7# a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

14# License for the specific language governing permissions and limitations 

15# under the License. 

16 

17"""Share-related Utilities and helpers.""" 

18 

19from oslo_config import cfg 

20 

21from manila.common import constants 

22from manila.db import migration 

23from manila import rpc 

24from manila import utils 

25 

26DEFAULT_POOL_NAME = '_pool0' 

27CONF = cfg.CONF 

28 

29 

30def extract_host(host, level='backend', use_default_pool_name=False): 

31 """Extract Host, Backend or Pool information from host string. 

32 

33 :param host: String for host, which could include host@backend#pool info 

34 :param level: Indicate which level of information should be extracted 

35 from host string. Level can be 'host', 'backend', 'pool', 

36 or 'backend_name', default value is 'backend' 

37 :param use_default_pool_name: This flag specifies what to do 

38 if level == 'pool' and there is no 'pool' info 

39 encoded in host string. default_pool_name=True 

40 will return DEFAULT_POOL_NAME, otherwise it will 

41 return None. Default value of this parameter 

42 is False. 

43 :return: expected level of information 

44 

45 For example: 

46 host = 'HostA@BackendB#PoolC' 

47 ret = extract_host(host, 'host') 

48 # ret is 'HostA' 

49 ret = extract_host(host, 'backend') 

50 # ret is 'HostA@BackendB' 

51 ret = extract_host(host, 'pool') 

52 # ret is 'PoolC' 

53 ret = extract_host(host, 'backend_name') 

54 # ret is 'BackendB' 

55 host = 'HostX@BackendY' 

56 ret = extract_host(host, 'pool') 

57 # ret is None 

58 ret = extract_host(host, 'pool', True) 

59 # ret is '_pool0' 

60 """ 

61 if level == 'host': 

62 # Make sure pool is not included 

63 hst = host.split('#')[0] 

64 return hst.split('@')[0] 

65 if level == 'backend_name': 

66 hst = host.split('#')[0] 

67 return hst.split('@')[1] 

68 elif level == 'backend': 

69 return host.split('#')[0] 

70 elif level == 'pool': 70 ↛ exitline 70 didn't return from function 'extract_host' because the condition on line 70 was always true

71 lst = host.split('#') 

72 if len(lst) == 2: 

73 return lst[1] 

74 elif use_default_pool_name is True: 

75 return DEFAULT_POOL_NAME 

76 else: 

77 return None 

78 

79 

80def append_host(host, pool): 

81 """Encode pool into host info.""" 

82 if not host or not pool: 

83 return host 

84 

85 new_host = "#".join([host, pool]) 

86 return new_host 

87 

88 

89def get_active_replica(replica_list): 

90 """Returns the first 'active' replica in the list of replicas provided.""" 

91 for replica in replica_list: 

92 if replica['replica_state'] == constants.REPLICA_STATE_ACTIVE: 

93 return replica 

94 

95 

96def change_rules_to_readonly(access_rules, add_rules, delete_rules): 

97 dict_access_rules = cast_access_object_to_dict_in_readonly(access_rules) 

98 dict_add_rules = cast_access_object_to_dict_in_readonly(add_rules) 

99 dict_delete_rules = cast_access_object_to_dict_in_readonly(delete_rules) 

100 return dict_access_rules, dict_add_rules, dict_delete_rules 

101 

102 

103def cast_access_object_to_dict_in_readonly(rules): 

104 dict_rules = [] 

105 for rule in rules: 

106 dict_rules.append({ 

107 'access_level': constants.ACCESS_LEVEL_RO, 

108 'access_type': rule['access_type'], 

109 'access_to': rule['access_to'] 

110 }) 

111 return dict_rules 

112 

113 

114@utils.if_notifications_enabled 

115def notify_about_share_usage(context, share, share_instance, 

116 event_suffix, extra_usage_info=None, host=None): 

117 

118 if not host: 

119 host = CONF.host 

120 

121 if not extra_usage_info: 

122 extra_usage_info = {} 

123 

124 usage_info = _usage_from_share(share, share_instance, **extra_usage_info) 

125 

126 rpc.get_notifier("share", host).info(context, 'share.%s' % event_suffix, 

127 usage_info) 

128 

129 

130def _usage_from_share(share_ref, share_instance_ref, **extra_usage_info): 

131 

132 usage_info = { 

133 'share_id': share_ref['id'], 

134 'user_id': share_ref['user_id'], 

135 'project_id': share_ref['project_id'], 

136 'snapshot_id': share_ref['snapshot_id'], 

137 'share_group_id': share_ref['share_group_id'], 

138 'size': share_ref['size'], 

139 'name': share_ref['display_name'], 

140 'description': share_ref['display_description'], 

141 'proto': share_ref['share_proto'], 

142 'is_public': share_ref['is_public'], 

143 'availability_zone': share_instance_ref['availability_zone'], 

144 'host': share_instance_ref['host'], 

145 'status': share_instance_ref['status'], 

146 'share_type_id': share_instance_ref['share_type_id'], 

147 'share_type': share_instance_ref['share_type']['name'], 

148 } 

149 

150 usage_info.update(extra_usage_info) 

151 

152 return usage_info 

153 

154 

155def get_recent_db_migration_id(): 

156 return migration.version() 

157 

158 

159def is_az_subnets_compatible(subnet_list, new_subnet_list): 

160 if len(subnet_list) != len(new_subnet_list): 

161 return False 

162 

163 for subnet in subnet_list: 

164 found_compatible = False 

165 for new_subnet in new_subnet_list: 

166 if (subnet.get('neutron_net_id') == 

167 new_subnet.get('neutron_net_id') and 

168 subnet.get('neutron_subnet_id') == 

169 new_subnet.get('neutron_subnet_id')): 

170 found_compatible = True 

171 break 

172 if not found_compatible: 

173 return False 

174 

175 return True