Coverage for manila/tests/db_utils.py: 100%

104 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2015 Mirantis Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16import copy 

17 

18from oslo_utils import uuidutils 

19 

20from manila.common import constants 

21from manila import context 

22from manila import db 

23from manila.message import message_levels 

24 

25 

26def _create_db_row(method, default_values, custom_values): 

27 override_defaults = custom_values.pop('override_defaults', None) 

28 if override_defaults: 

29 default_values = custom_values 

30 else: 

31 default_values.update(copy.deepcopy(custom_values)) 

32 return method(context.get_admin_context(), default_values) 

33 

34 

35def create_share_group(**kwargs): 

36 """Create a share group object.""" 

37 share_group = { 

38 'share_network_id': None, 

39 'share_server_id': None, 

40 'user_id': 'fake', 

41 'project_id': 'fake', 

42 'status': constants.STATUS_CREATING, 

43 'host': 'fake_host' 

44 } 

45 return _create_db_row(db.share_group_create, share_group, kwargs) 

46 

47 

48def create_share_group_snapshot(share_group_id, **kwargs): 

49 """Create a share group snapshot object.""" 

50 snapshot = { 

51 'share_group_id': share_group_id, 

52 'user_id': 'fake', 

53 'project_id': 'fake', 

54 'status': constants.STATUS_CREATING, 

55 } 

56 return _create_db_row(db.share_group_snapshot_create, snapshot, kwargs) 

57 

58 

59def create_share_group_snapshot_member(share_group_snapshot_id, **kwargs): 

60 """Create a share group snapshot member object.""" 

61 member = { 

62 'share_proto': "NFS", 

63 'size': 0, 

64 'share_instance_id': None, 

65 'user_id': 'fake', 

66 'project_id': 'fake', 

67 'status': 'creating', 

68 'share_group_snapshot_id': share_group_snapshot_id, 

69 } 

70 return _create_db_row( 

71 db.share_group_snapshot_member_create, member, kwargs) 

72 

73 

74def create_share_access(**kwargs): 

75 share_access = { 

76 'id': 'fake_id', 

77 'access_type': 'ip', 

78 'access_to': 'fake_ip_address' 

79 } 

80 return _create_db_row(db.share_access_create, share_access, kwargs) 

81 

82 

83def create_share(**kwargs): 

84 """Create a share object.""" 

85 share = { 

86 'share_proto': "NFS", 

87 'size': 0, 

88 'snapshot_id': None, 

89 'share_network_id': None, 

90 'share_server_id': None, 

91 'user_id': 'fake', 

92 'project_id': 'fake', 

93 'metadata': {}, 

94 'availability_zone': 'fake_availability_zone', 

95 'status': constants.STATUS_CREATING, 

96 'host': 'fake_host', 

97 'is_soft_deleted': False, 

98 'mount_point_name': 'fake_mp', 

99 } 

100 return _create_db_row(db.share_create, share, kwargs) 

101 

102 

103def create_share_without_instance(**kwargs): 

104 share = { 

105 'share_proto': "NFS", 

106 'size': 0, 

107 'snapshot_id': None, 

108 'share_network_id': None, 

109 'share_server_id': None, 

110 'user_id': 'fake', 

111 'project_id': 'fake', 

112 'metadata': {}, 

113 'availability_zone': 'fake_availability_zone', 

114 'status': constants.STATUS_CREATING, 

115 'host': 'fake_host', 

116 'is_soft_deleted': False, 

117 'mount_point_name': None, 

118 } 

119 share.update(copy.deepcopy(kwargs)) 

120 return db.share_create(context.get_admin_context(), share, False) 

121 

122 

123def create_share_instance(**kwargs): 

124 """Create a share instance object.""" 

125 return db.share_instance_create(context.get_admin_context(), 

126 kwargs.pop('share_id'), kwargs) 

127 

128 

129def create_share_replica(**kwargs): 

130 """Create a share replica object.""" 

131 

132 if 'share_id' not in kwargs: 

133 share = create_share() 

134 kwargs['share_id'] = share['id'] 

135 

136 return db.share_instance_create(context.get_admin_context(), 

137 kwargs.pop('share_id'), kwargs) 

138 

139 

140def create_snapshot(**kwargs): 

141 """Create a snapshot object.""" 

142 with_share = kwargs.pop('with_share', False) 

143 

144 share = None 

145 if with_share: 

146 share = create_share(status=constants.STATUS_AVAILABLE, 

147 size=kwargs.get('size', 0)) 

148 

149 snapshot = { 

150 'share_proto': "NFS", 

151 'size': 0, 

152 'share_id': share['id'] if with_share else None, 

153 'user_id': 'fake', 

154 'project_id': 'fake', 

155 'status': 'creating', 

156 'provider_location': 'fake', 

157 } 

158 snapshot.update(kwargs) 

159 

160 return db.share_snapshot_create( 

161 context.get_admin_context(), 

162 snapshot, 

163 create_snapshot_instance='instances' not in snapshot, 

164 ) 

165 

166 

167def create_snapshot_instance(snapshot_id, **kwargs): 

168 """Create a share snapshot instance object.""" 

169 

170 snapshot_instance = { 

171 'provider_location': 'fake_provider_location', 

172 'progress': '0%', 

173 'status': constants.STATUS_CREATING, 

174 } 

175 

176 snapshot_instance.update(kwargs) 

177 return db.share_snapshot_instance_create( 

178 context.get_admin_context(), snapshot_id, snapshot_instance) 

179 

180 

181def create_snapshot_instance_export_locations(snapshot_id, **kwargs): 

182 """Create a snapshot instance export location object.""" 

183 export_location = { 

184 'share_snapshot_instance_id': snapshot_id, 

185 } 

186 

187 export_location.update(kwargs) 

188 return db.share_snapshot_instance_export_location_create( 

189 context.get_admin_context(), export_location) 

190 

191 

192def create_access(**kwargs): 

193 """Create an access rule object.""" 

194 state = kwargs.pop('state', constants.ACCESS_STATE_QUEUED_TO_APPLY) 

195 access = { 

196 'access_type': 'fake_type', 

197 'access_to': 'fake_IP', 

198 'share_id': kwargs.pop('share_id', None) or create_share()['id'], 

199 } 

200 access.update(kwargs) 

201 share_access_rule = _create_db_row(db.share_access_create, access, kwargs) 

202 

203 for mapping in share_access_rule.instance_mappings: 

204 db.share_instance_access_update( 

205 context.get_admin_context(), share_access_rule['id'], 

206 mapping.share_instance_id, {'state': state}) 

207 

208 return share_access_rule 

209 

210 

211def create_snapshot_access(**kwargs): 

212 """Create a snapshot access rule object.""" 

213 access = { 

214 'access_type': 'fake_type', 

215 'access_to': 'fake_IP', 

216 'share_snapshot_id': None, 

217 } 

218 return _create_db_row(db.share_snapshot_access_create, access, kwargs) 

219 

220 

221def create_share_server(**kwargs): 

222 """Create a share server object.""" 

223 backend_details = kwargs.pop('backend_details', {}) 

224 srv = { 

225 'host': 'host1', 

226 'status': constants.STATUS_ACTIVE 

227 } 

228 share_srv = _create_db_row(db.share_server_create, srv, kwargs) 

229 if backend_details: 

230 db.share_server_backend_details_set( 

231 context.get_admin_context(), share_srv['id'], backend_details) 

232 return db.share_server_get(context.get_admin_context(), 

233 share_srv['id']) 

234 

235 

236def create_share_type(**kwargs): 

237 """Create a share type object""" 

238 

239 share_type = { 

240 'name': 'fake_type', 

241 'is_public': True, 

242 } 

243 

244 return _create_db_row(db.share_type_create, share_type, kwargs) 

245 

246 

247def create_share_group_type(**kwargs): 

248 """Create a share group type object""" 

249 

250 share_group_type = { 

251 'name': 'fake_group_type', 

252 'is_public': True, 

253 } 

254 

255 return _create_db_row(db.share_group_type_create, share_group_type, 

256 kwargs) 

257 

258 

259def create_share_network(**kwargs): 

260 """Create a share network object.""" 

261 net = { 

262 'user_id': 'fake', 

263 'project_id': 'fake', 

264 'status': 'active', 

265 'name': 'whatever', 

266 'description': 'fake description', 

267 } 

268 return _create_db_row(db.share_network_create, net, kwargs) 

269 

270 

271def create_share_network_subnet(**kwargs): 

272 """Create a share network subnet object.""" 

273 subnet = { 

274 'id': 'fake_sns_id', 

275 'neutron_net_id': 'fake-neutron-net', 

276 'neutron_subnet_id': 'fake-neutron-subnet', 

277 'network_type': 'vlan', 

278 'segmentation_id': 1000, 

279 'cidr': '10.0.0.0/24', 

280 'ip_version': 4, 

281 'availability_zone_id': 'fake_zone_id', 

282 'share_network_id': 'fake_sn_id', 

283 'gateway': None, 

284 'mtu': None 

285 } 

286 return _create_db_row(db.share_network_subnet_create, subnet, kwargs) 

287 

288 

289def create_security_service(**kwargs): 

290 share_network_id = kwargs.pop('share_network_id', None) 

291 service = { 

292 'type': "FAKE", 

293 'project_id': 'fake-project-id', 

294 } 

295 service_ref = _create_db_row(db.security_service_create, service, kwargs) 

296 

297 if share_network_id: 

298 db.share_network_add_security_service(context.get_admin_context(), 

299 share_network_id, 

300 service_ref['id']) 

301 return service_ref 

302 

303 

304def create_message(**kwargs): 

305 message_dict = { 

306 'action': 'fake_Action', 

307 'project_id': 'fake-project-id', 

308 'message_level': message_levels.ERROR, 

309 } 

310 return _create_db_row(db.message_create, message_dict, kwargs) 

311 

312 

313def create_transfer(**kwargs): 

314 transfer = {'display_name': 'display_name', 

315 'salt': 'salt', 

316 'crypt_hash': 'crypt_hash', 

317 'resource_type': constants.SHARE_RESOURCE_TYPE} 

318 return _create_db_row(db.transfer_create, transfer, kwargs) 

319 

320 

321def create_backup(share_id, **kwargs): 

322 """Create a share backup object.""" 

323 backup = { 

324 'host': "fake_host", 

325 'share_network_id': None, 

326 'share_server_id': None, 

327 'user_id': 'fake', 

328 'project_id': 'fake', 

329 'availability_zone': 'fake_availability_zone', 

330 'status': constants.STATUS_CREATING, 

331 'topic': 'fake_topic', 

332 'description': 'fake_description', 

333 'size': '1', 

334 } 

335 backup.update(kwargs) 

336 return db.share_backup_create( 

337 context.get_admin_context(), share_id, backup) 

338 

339 

340def create_lock(**kwargs): 

341 lock = { 

342 'resource_id': uuidutils.generate_uuid(), 

343 'user_id': uuidutils.generate_uuid(dashed=False), 

344 'project_id': uuidutils.generate_uuid(dashed=False), 

345 'lock_context': 'user', 

346 'lock_reason': 'for the tests', 

347 'resource_type': 'share', 

348 'resource_action': 'delete', 

349 } 

350 return _create_db_row(db.resource_lock_create, lock, kwargs)