Coverage for manila/api/views/share_networks.py: 100%

62 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2014 OpenStack LLC. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16from manila.api import common 

17 

18 

19class ViewBuilder(common.ViewBuilder): 

20 """Model a server API response as a python dictionary.""" 

21 

22 _collection_name = 'share_networks' 

23 _detail_version_modifiers = ["add_gateway", "add_mtu", "add_nova_net_id", 

24 "add_subnets", 

25 "add_status_and_sec_service_update_fields", 

26 "add_network_allocation_update_support_field", 

27 "add_subnet_with_metadata"] 

28 

29 def build_share_network(self, request, share_network): 

30 """View of a share network.""" 

31 

32 return {'share_network': self._build_share_network_view( 

33 request, share_network)} 

34 

35 def build_share_networks(self, request, share_networks, is_detail=True): 

36 return {'share_networks': 

37 [self._build_share_network_view( 

38 request, share_network, is_detail) 

39 for share_network in share_networks]} 

40 

41 def build_security_service_update_check(self, request, params, result): 

42 """View of security service add or update check.""" 

43 context = request.environ['manila.context'] 

44 requested_operation = { 

45 'operation': ('update_security_service' 

46 if params.get('current_service_id') 

47 else 'add_security_service'), 

48 'current_security_service': params.get('current_service_id'), 

49 'new_security_service': (params.get('new_service_id') or 

50 params.get('security_service_id')) 

51 } 

52 view = { 

53 'compatible': result['compatible'], 

54 'requested_operation': requested_operation, 

55 } 

56 if context.is_admin: 

57 view['hosts_check_result'] = result['hosts_check_result'] 

58 return view 

59 

60 def build_share_network_subnet_create_check(self, request, result): 

61 """View of share network subnet create check.""" 

62 context = request.environ['manila.context'] 

63 view = { 

64 'compatible': result['compatible'], 

65 } 

66 if context.is_admin: 

67 view['hosts_check_result'] = result['hosts_check_result'] 

68 return view 

69 

70 def _update_share_network_info(self, request, share_network): 

71 for sns in share_network.get('share_network_subnets') or []: 

72 if sns.get('is_default') and sns.get('is_default') is True: 

73 share_network.update({ 

74 'neutron_net_id': sns.get('neutron_net_id'), 

75 'neutron_subnet_id': sns.get('neutron_subnet_id'), 

76 'network_type': sns.get('network_type'), 

77 'segmentation_id': sns.get('segmentation_id'), 

78 'cidr': sns.get('cidr'), 

79 'ip_version': sns.get('ip_version'), 

80 'gateway': sns.get('gateway'), 

81 'mtu': sns.get('mtu'), 

82 }) 

83 

84 def _build_share_network_view(self, request, share_network, 

85 is_detail=True): 

86 sn = { 

87 'id': share_network.get('id'), 

88 'name': share_network.get('name'), 

89 } 

90 if is_detail: 

91 self._update_share_network_info(request, share_network) 

92 sn.update({ 

93 'project_id': share_network.get('project_id'), 

94 'created_at': share_network.get('created_at'), 

95 'updated_at': share_network.get('updated_at'), 

96 'neutron_net_id': share_network.get('neutron_net_id'), 

97 'neutron_subnet_id': share_network.get('neutron_subnet_id'), 

98 'network_type': share_network.get('network_type'), 

99 'segmentation_id': share_network.get('segmentation_id'), 

100 'cidr': share_network.get('cidr'), 

101 'ip_version': share_network.get('ip_version'), 

102 'description': share_network.get('description'), 

103 }) 

104 

105 self.update_versioned_resource_dict(request, sn, share_network) 

106 return sn 

107 

108 @common.ViewBuilder.versioned_method("2.51", "2.77") 

109 def add_subnets(self, context, network_dict, network): 

110 subnets = [{ 

111 'id': sns.get('id'), 

112 'availability_zone': sns.get('availability_zone'), 

113 'created_at': sns.get('created_at'), 

114 'updated_at': sns.get('updated_at'), 

115 'segmentation_id': sns.get('segmentation_id'), 

116 'neutron_net_id': sns.get('neutron_net_id'), 

117 'neutron_subnet_id': sns.get('neutron_subnet_id'), 

118 'ip_version': sns.get('ip_version'), 

119 'cidr': sns.get('cidr'), 

120 'network_type': sns.get('network_type'), 

121 'mtu': sns.get('mtu'), 

122 'gateway': sns.get('gateway'), 

123 } for sns in network.get('share_network_subnets')] 

124 

125 network_dict['share_network_subnets'] = subnets 

126 attr_to_remove = [ 

127 'neutron_net_id', 'neutron_subnet_id', 'network_type', 

128 'segmentation_id', 'cidr', 'ip_version', 'gateway', 'mtu'] 

129 for attr in attr_to_remove: 

130 network_dict.pop(attr) 

131 

132 @common.ViewBuilder.versioned_method("2.18") 

133 def add_gateway(self, context, network_dict, network): 

134 network_dict['gateway'] = network.get('gateway') 

135 

136 @common.ViewBuilder.versioned_method("2.20") 

137 def add_mtu(self, context, network_dict, network): 

138 network_dict['mtu'] = network.get('mtu') 

139 

140 @common.ViewBuilder.versioned_method("1.0", "2.25") 

141 def add_nova_net_id(self, context, network_dict, network): 

142 network_dict['nova_net_id'] = None 

143 

144 @common.ViewBuilder.versioned_method("2.63") 

145 def add_status_and_sec_service_update_fields( 

146 self, context, network_dict, network): 

147 network_dict['status'] = network.get('status') 

148 network_dict['security_service_update_support'] = network.get( 

149 'security_service_update_support') 

150 

151 @common.ViewBuilder.versioned_method("2.70") 

152 def add_network_allocation_update_support_field( 

153 self, context, network_dict, network): 

154 network_dict['network_allocation_update_support'] = network.get( 

155 'network_allocation_update_support') 

156 

157 @common.ViewBuilder.versioned_method("2.78") 

158 def add_subnet_with_metadata(self, context, network_dict, network): 

159 subnets = [{ 

160 'id': sns.get('id'), 

161 'availability_zone': sns.get('availability_zone'), 

162 'created_at': sns.get('created_at'), 

163 'updated_at': sns.get('updated_at'), 

164 'segmentation_id': sns.get('segmentation_id'), 

165 'neutron_net_id': sns.get('neutron_net_id'), 

166 'neutron_subnet_id': sns.get('neutron_subnet_id'), 

167 'ip_version': sns.get('ip_version'), 

168 'cidr': sns.get('cidr'), 

169 'network_type': sns.get('network_type'), 

170 'mtu': sns.get('mtu'), 

171 'gateway': sns.get('gateway'), 

172 'metadata': sns.get('subnet_metadata'), 

173 } for sns in network.get('share_network_subnets')] 

174 

175 network_dict['share_network_subnets'] = subnets 

176 attr_to_remove = [ 

177 'neutron_net_id', 'neutron_subnet_id', 'network_type', 

178 'segmentation_id', 'cidr', 'ip_version', 'gateway', 'mtu'] 

179 for attr in attr_to_remove: 

180 network_dict.pop(attr)