Coverage for manila/tests/api/contrib/stubs.py: 96%

79 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2010 OpenStack LLC. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16from collections import abc 

17import datetime 

18 

19from manila.common import constants 

20from manila import exception as exc 

21 

22FAKE_UUID = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa' 

23FAKE_UUIDS = {} 

24 

25 

26def stub_share(id, **kwargs): 

27 share = { 

28 'id': id, 

29 'share_proto': 'FAKEPROTO', 

30 'export_location': 'fake_location', 

31 'export_locations': ['fake_location', 'fake_location2'], 

32 'user_id': 'fakeuser', 

33 'project_id': 'fakeproject', 

34 'size': 1, 

35 'access_rules_status': 'active', 

36 'status': 'fakestatus', 

37 'display_name': 'displayname', 

38 'display_description': 'displaydesc', 

39 'created_at': datetime.datetime(1, 1, 1, 1, 1, 1), 

40 'snapshot_id': '2', 

41 'share_type_id': '1', 

42 'is_public': False, 

43 'snapshot_support': True, 

44 'create_share_from_snapshot_support': True, 

45 'revert_to_snapshot_support': False, 

46 'mount_snapshot_support': False, 

47 'replication_type': None, 

48 'has_replicas': False, 

49 'is_soft_deleted': False, 

50 } 

51 

52 share_instance = { 

53 'host': 'fakehost', 

54 'availability_zone': 'fakeaz', 

55 'share_network_id': None, 

56 'share_server_id': 'fake_share_server_id', 

57 'access_rules_status': 'active', 

58 'share_type_id': '1', 

59 'encryption_key_ref': None, 

60 } 

61 if 'instance' in kwargs: 

62 share_instance.update(kwargs.pop('instance')) 

63 else: 

64 # remove any share instance kwargs so they don't go into the share 

65 for inst_key in share_instance.keys(): 

66 if inst_key in kwargs: 

67 share_instance[inst_key] = kwargs.pop(inst_key) 

68 

69 share.update(kwargs) 

70 

71 # NOTE(ameade): We must wrap the dictionary in an class in order to stub 

72 # object attributes. 

73 class wrapper(abc.Mapping): 

74 def __getitem__(self, name): 

75 if hasattr(self, name): 

76 return getattr(self, name) 

77 return self.__dict__[name] 

78 

79 def __iter__(self): 

80 return iter(self.__dict__) 

81 

82 def __len__(self): 

83 return len(self.__dict__) 

84 

85 def update(self, other): 

86 self.__dict__.update(other) 

87 

88 fake_share = wrapper() 

89 fake_share.instance = {'id': "fake_instance_id"} 

90 fake_share.instance.update(share_instance) 

91 fake_share.update(share) 

92 

93 # stub out is_busy based on task_state, this mimics what's in the Share 

94 # data model type 

95 if share.get('task_state') in constants.BUSY_TASK_STATES: 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true

96 fake_share.is_busy = True 

97 else: 

98 fake_share.is_busy = False 

99 

100 fake_share.instances = [fake_share.instance] 

101 

102 return fake_share 

103 

104 

105def stub_snapshot(id, **kwargs): 

106 snapshot = { 

107 'id': id, 

108 'share_id': 'fakeshareid', 

109 'share_proto': 'fakesnapproto', 

110 'export_location': 'fakesnaplocation', 

111 'user_id': 'fakesnapuser', 

112 'project_id': 'fakesnapproject', 

113 'host': 'fakesnaphost', 

114 'share_size': 1, 

115 'size': 1, 

116 'status': 'fakesnapstatus', 

117 'aggregate_status': 'fakesnapstatus', 

118 'share_name': 'fakesharename', 

119 'display_name': 'displaysnapname', 

120 'display_description': 'displaysnapdesc', 

121 'created_at': datetime.datetime(1, 1, 1, 1, 1, 1), 

122 'metadata': {} 

123 } 

124 snapshot.update(kwargs) 

125 return snapshot 

126 

127 

128def stub_share_type(id, **kwargs): 

129 share_type = { 

130 'id': id, 

131 'name': 'fakesharetype', 

132 'description': 'fakesharetypedescription', 

133 'is_public': True, 

134 } 

135 share_type.update(kwargs) 

136 return share_type 

137 

138 

139def stub_share_type_get(context, share_type_id, **kwargs): 

140 return stub_share_type(share_type_id, **kwargs) 

141 

142 

143def stub_share_get(self, context, share_id, **kwargs): 

144 return stub_share(share_id, **kwargs) 

145 

146 

147def stub_share_get_notfound(self, context, share_id, **kwargs): 

148 raise exc.NotFound 

149 

150 

151def stub_share_delete(self, context, *args, **param): 

152 pass 

153 

154 

155def stub_share_soft_delete(self, context, *args, **param): 

156 pass 

157 

158 

159def stub_share_restore(self, context, *args, **param): 

160 pass 

161 

162 

163def stub_share_update(self, context, *args, **param): 

164 share = stub_share('1') 

165 return share 

166 

167 

168def stub_snapshot_update(self, context, *args, **param): 

169 share = stub_share('1') 

170 return share 

171 

172 

173def stub_share_get_all_by_project(self, context, sort_key=None, sort_dir=None, 

174 search_opts={}): 

175 return [stub_share_get(self, context, '1')] 

176 

177 

178def stub_get_all_shares(self, context): 

179 return [stub_share(100, project_id='fake'), 

180 stub_share(101, project_id='superfake'), 

181 stub_share(102, project_id='superduperfake')] 

182 

183 

184def stub_snapshot_get(self, context, snapshot_id): 

185 return stub_snapshot(snapshot_id) 

186 

187 

188def stub_snapshot_get_notfound(self, context, snapshot_id): 

189 raise exc.NotFound 

190 

191 

192def stub_snapshot_create(self, context, share, display_name, 

193 display_description): 

194 return stub_snapshot(200, 

195 share_id=share['id'], 

196 display_name=display_name, 

197 display_description=display_description) 

198 

199 

200def stub_snapshot_delete(self, context, *args, **param): 

201 pass 

202 

203 

204def stub_snapshot_get_all_by_project(self, context, search_opts=None, 

205 limit=None, offset=None, 

206 sort_key=None, sort_dir=None): 

207 return [stub_snapshot_get(self, context, 2)] 

208 

209 

210def stub_share_group_snapshot_member(id, **kwargs): 

211 member = { 

212 'id': id, 

213 'share_id': 'fakeshareid', 

214 'share_instance_id': 'fakeshareinstanceid', 

215 'share_proto': 'fakesnapproto', 

216 'share_type_id': 'fake_share_type_id', 

217 'export_location': 'fakesnaplocation', 

218 'user_id': 'fakesnapuser', 

219 'project_id': 'fakesnapproject', 

220 'host': 'fakesnaphost', 

221 'share_size': 1, 

222 'size': 1, 

223 'status': 'fakesnapstatus', 

224 'created_at': datetime.datetime(1, 1, 1, 1, 1, 1), 

225 } 

226 member.update(kwargs) 

227 return member