Coverage for manila/tests/db/sqlalchemy/test_models.py: 100%

85 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2015 Hitachi Data Systems. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15"""Testing of SQLAlchemy model classes.""" 

16 

17import ddt 

18 

19from manila.common import constants 

20from manila import context 

21from manila.db.sqlalchemy import api as db_api 

22from manila import test 

23from manila.tests import db_utils 

24 

25 

26@ddt.ddt 

27class ShareTestCase(test.TestCase): 

28 """Testing of SQLAlchemy Share model class.""" 

29 

30 @ddt.data(constants.STATUS_MANAGE_ERROR, constants.STATUS_CREATING, 

31 constants.STATUS_EXTENDING, constants.STATUS_DELETING, 

32 constants.STATUS_EXTENDING_ERROR, 

33 constants.STATUS_ERROR_DELETING, constants.STATUS_MANAGING, 

34 constants.STATUS_MANAGE_ERROR) 

35 def test_share_instance_available(self, status): 

36 

37 instance_list = [ 

38 db_utils.create_share_instance(status=constants.STATUS_AVAILABLE, 

39 share_id='fake_id'), 

40 db_utils.create_share_instance(status=status, 

41 share_id='fake_id') 

42 ] 

43 

44 share1 = db_utils.create_share(instances=instance_list) 

45 share2 = db_utils.create_share(instances=list(reversed(instance_list))) 

46 

47 self.assertEqual(constants.STATUS_AVAILABLE, share1.instance['status']) 

48 self.assertEqual(constants.STATUS_AVAILABLE, share2.instance['status']) 

49 

50 @ddt.data([constants.STATUS_MANAGE_ERROR, constants.STATUS_CREATING], 

51 [constants.STATUS_ERROR_DELETING, constants.STATUS_DELETING], 

52 [constants.STATUS_ERROR, constants.STATUS_MANAGING], 

53 [constants.STATUS_UNMANAGE_ERROR, constants.STATUS_UNMANAGING], 

54 [constants.STATUS_INACTIVE, constants.STATUS_EXTENDING], 

55 [constants.STATUS_SHRINKING_ERROR, constants.STATUS_SHRINKING]) 

56 @ddt.unpack 

57 def test_share_instance_not_transitional(self, status, trans_status): 

58 

59 instance_list = [ 

60 db_utils.create_share_instance(status=status, 

61 share_id='fake_id'), 

62 db_utils.create_share_instance(status=trans_status, 

63 share_id='fake_id') 

64 ] 

65 

66 share1 = db_utils.create_share(instances=instance_list) 

67 share2 = db_utils.create_share(instances=list(reversed(instance_list))) 

68 

69 self.assertEqual(status, share1.instance['status']) 

70 self.assertEqual(status, share2.instance['status']) 

71 

72 def test_share_instance_creating(self): 

73 

74 share = db_utils.create_share(status=constants.STATUS_CREATING) 

75 

76 self.assertEqual(constants.STATUS_CREATING, share.instance['status']) 

77 

78 @ddt.data(constants.STATUS_REPLICATION_CHANGE, constants.STATUS_AVAILABLE, 

79 constants.STATUS_ERROR, constants.STATUS_CREATING) 

80 def test_share_instance_reverting(self, status): 

81 

82 instance_list = [ 

83 db_utils.create_share_instance( 

84 status=constants.STATUS_REVERTING, 

85 share_id='fake_id'), 

86 db_utils.create_share_instance( 

87 status=status, share_id='fake_id'), 

88 db_utils.create_share_instance( 

89 status=constants.STATUS_ERROR_DELETING, share_id='fake_id'), 

90 ] 

91 

92 share1 = db_utils.create_share(instances=instance_list) 

93 share2 = db_utils.create_share(instances=list(reversed(instance_list))) 

94 

95 self.assertEqual( 

96 constants.STATUS_REVERTING, share1.instance['status']) 

97 self.assertEqual( 

98 constants.STATUS_REVERTING, share2.instance['status']) 

99 

100 @ddt.data(constants.STATUS_AVAILABLE, constants.STATUS_ERROR, 

101 constants.STATUS_CREATING) 

102 def test_share_instance_replication_change(self, status): 

103 

104 instance_list = [ 

105 db_utils.create_share_instance( 

106 status=constants.STATUS_REPLICATION_CHANGE, 

107 share_id='fake_id'), 

108 db_utils.create_share_instance( 

109 status=status, share_id='fake_id'), 

110 db_utils.create_share_instance( 

111 status=constants.STATUS_ERROR_DELETING, share_id='fake_id') 

112 ] 

113 

114 share1 = db_utils.create_share(instances=instance_list) 

115 share2 = db_utils.create_share(instances=list(reversed(instance_list))) 

116 

117 self.assertEqual( 

118 constants.STATUS_REPLICATION_CHANGE, share1.instance['status']) 

119 self.assertEqual( 

120 constants.STATUS_REPLICATION_CHANGE, share2.instance['status']) 

121 

122 def test_share_instance_prefer_active_instance(self): 

123 

124 instance_list = [ 

125 db_utils.create_share_instance( 

126 status=constants.STATUS_AVAILABLE, 

127 share_id='fake_id', 

128 replica_state=constants.REPLICA_STATE_IN_SYNC), 

129 db_utils.create_share_instance( 

130 status=constants.STATUS_CREATING, 

131 share_id='fake_id', 

132 replica_state=constants.REPLICA_STATE_OUT_OF_SYNC), 

133 db_utils.create_share_instance( 

134 status=constants.STATUS_ERROR, share_id='fake_id', 

135 replica_state=constants.REPLICA_STATE_ACTIVE), 

136 db_utils.create_share_instance( 

137 status=constants.STATUS_MANAGING, share_id='fake_id', 

138 replica_state=constants.REPLICA_STATE_ACTIVE), 

139 ] 

140 

141 share1 = db_utils.create_share(instances=instance_list) 

142 share2 = db_utils.create_share(instances=list(reversed(instance_list))) 

143 

144 self.assertEqual( 

145 constants.STATUS_ERROR, share1.instance['status']) 

146 self.assertEqual( 

147 constants.STATUS_ERROR, share2.instance['status']) 

148 

149 def test_access_rules_status_no_instances(self): 

150 share = db_utils.create_share(instances=[]) 

151 

152 self.assertEqual(constants.STATUS_ACTIVE, share.access_rules_status) 

153 

154 @ddt.data(constants.STATUS_ACTIVE, constants.SHARE_INSTANCE_RULES_SYNCING, 

155 constants.SHARE_INSTANCE_RULES_ERROR) 

156 def test_access_rules_status(self, access_status): 

157 instances = [ 

158 db_utils.create_share_instance( 

159 share_id='fake_id', status=constants.STATUS_ERROR, 

160 access_rules_status=constants.STATUS_ACTIVE), 

161 db_utils.create_share_instance( 

162 share_id='fake_id', status=constants.STATUS_AVAILABLE, 

163 access_rules_status=constants.STATUS_ACTIVE), 

164 db_utils.create_share_instance( 

165 share_id='fake_id', status=constants.STATUS_AVAILABLE, 

166 access_rules_status=access_status), 

167 ] 

168 

169 share = db_utils.create_share(instances=instances) 

170 

171 self.assertEqual(access_status, share.access_rules_status) 

172 

173 

174@ddt.ddt 

175class ShareAccessTestCase(test.TestCase): 

176 """Testing of SQLAlchemy Share Access related model classes.""" 

177 

178 @ddt.data(constants.ACCESS_STATE_QUEUED_TO_APPLY, 

179 constants.ACCESS_STATE_ACTIVE, constants.ACCESS_STATE_ERROR, 

180 constants.ACCESS_STATE_APPLYING) 

181 def test_share_access_mapping_state(self, expected_status): 

182 ctxt = context.get_admin_context() 

183 

184 share = db_utils.create_share() 

185 share_instances = [ 

186 share.instance, 

187 db_utils.create_share_instance(share_id=share['id']), 

188 db_utils.create_share_instance(share_id=share['id']), 

189 db_utils.create_share_instance(share_id=share['id']), 

190 ] 

191 access_rule = db_utils.create_access(share_id=share['id']) 

192 

193 # Update the access mapping states 

194 db_api.share_instance_access_update( 

195 ctxt, access_rule['id'], share_instances[0]['id'], 

196 {'state': constants.ACCESS_STATE_ACTIVE}) 

197 db_api.share_instance_access_update( 

198 ctxt, access_rule['id'], share_instances[1]['id'], 

199 {'state': expected_status}) 

200 db_api.share_instance_access_update( 

201 ctxt, access_rule['id'], share_instances[2]['id'], 

202 {'state': constants.ACCESS_STATE_ACTIVE}) 

203 db_api.share_instance_access_update( 

204 ctxt, access_rule['id'], share_instances[3]['id'], 

205 {'deleted': 'True', 'state': constants.STATUS_DELETED}) 

206 

207 access_rule = db_api.share_access_get(ctxt, access_rule['id']) 

208 

209 self.assertEqual(expected_status, access_rule['state']) 

210 

211 

212class ShareSnapshotTestCase(test.TestCase): 

213 """Testing of SQLAlchemy ShareSnapshot model class.""" 

214 

215 def test_instance_and_proxified_properties(self): 

216 

217 in_sync_replica_instance = db_utils.create_share_instance( 

218 status=constants.STATUS_AVAILABLE, share_id='fake_id', 

219 replica_state=constants.REPLICA_STATE_IN_SYNC) 

220 active_replica_instance = db_utils.create_share_instance( 

221 status=constants.STATUS_AVAILABLE, share_id='fake_id', 

222 replica_state=constants.REPLICA_STATE_ACTIVE) 

223 out_of_sync_replica_instance = db_utils.create_share_instance( 

224 status=constants.STATUS_ERROR, share_id='fake_id', 

225 replica_state=constants.REPLICA_STATE_OUT_OF_SYNC) 

226 non_replica_instance = db_utils.create_share_instance( 

227 status=constants.STATUS_CREATING, share_id='fake_id') 

228 share_instances = [ 

229 in_sync_replica_instance, active_replica_instance, 

230 out_of_sync_replica_instance, non_replica_instance, 

231 ] 

232 share = db_utils.create_share(instances=share_instances) 

233 snapshot_instance_list = [ 

234 db_utils.create_snapshot_instance( 

235 'fake_snapshot_id', 

236 status=constants.STATUS_CREATING, 

237 share_instance_id=out_of_sync_replica_instance['id']), 

238 db_utils.create_snapshot_instance( 

239 'fake_snapshot_id', 

240 status=constants.STATUS_ERROR, 

241 share_instance_id=in_sync_replica_instance['id']), 

242 db_utils.create_snapshot_instance( 

243 'fake_snapshot_id', 

244 status=constants.STATUS_AVAILABLE, 

245 provider_location='hogsmeade:snapshot1', 

246 progress='87%', 

247 share_instance_id=active_replica_instance['id']), 

248 db_utils.create_snapshot_instance( 

249 'fake_snapshot_id', 

250 status=constants.STATUS_MANAGING, 

251 share_instance_id=non_replica_instance['id']), 

252 ] 

253 snapshot = db_utils.create_snapshot( 

254 id='fake_snapshot_id', share_id=share['id'], 

255 instances=snapshot_instance_list) 

256 

257 # Proxified properties 

258 self.assertEqual(constants.STATUS_AVAILABLE, snapshot['status']) 

259 self.assertEqual(constants.STATUS_ERROR, snapshot['aggregate_status']) 

260 self.assertEqual('hogsmeade:snapshot1', snapshot['provider_location']) 

261 self.assertEqual('87%', snapshot['progress']) 

262 

263 # Snapshot properties 

264 expected_share_name = '-'.join(['share', share['id']]) 

265 self.assertEqual(expected_share_name, snapshot['share_name']) 

266 self.assertEqual(active_replica_instance['id'], 

267 snapshot['instance']['share_instance_id'])