Coverage for manila/tests/share/test_snapshot_access.py: 100%

65 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2016 Hitachi Data Systems, Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16import copy 

17from unittest import mock 

18 

19import ddt 

20 

21from manila.common import constants 

22from manila import context 

23from manila import db 

24from manila import exception 

25from manila.share import snapshot_access 

26from manila import test 

27from manila.tests import db_utils 

28from manila import utils 

29 

30 

31@ddt.ddt 

32class SnapshotAccessTestCase(test.TestCase): 

33 def setUp(self): 

34 super(SnapshotAccessTestCase, self).setUp() 

35 self.driver = self.mock_class("manila.share.driver.ShareDriver", 

36 mock.Mock()) 

37 self.snapshot_access = snapshot_access.ShareSnapshotInstanceAccess( 

38 db, self.driver) 

39 self.context = context.get_admin_context() 

40 share = db_utils.create_share() 

41 self.snapshot = db_utils.create_snapshot(share_id=share['id']) 

42 self.snapshot_instance = db_utils.create_snapshot_instance( 

43 snapshot_id=self.snapshot['id'], 

44 share_instance_id=self.snapshot['share']['instance']['id']) 

45 

46 @ddt.data(constants.ACCESS_STATE_QUEUED_TO_APPLY, 

47 constants.ACCESS_STATE_QUEUED_TO_DENY) 

48 def test_update_access_rules(self, state): 

49 

50 rules = [] 

51 for i in range(2): 

52 rules.append({ 

53 'id': 'id-%s' % i, 

54 'state': state, 

55 'access_id': 'rule_id%s' % i 

56 }) 

57 all_rules = copy.deepcopy(rules) 

58 all_rules.append({ 

59 'id': 'id-3', 

60 'state': constants.ACCESS_STATE_ERROR, 

61 'access_id': 'rule_id3' 

62 }) 

63 

64 snapshot_instance_get = self.mock_object( 

65 db, 'share_snapshot_instance_get', 

66 mock.Mock(return_value=self.snapshot_instance)) 

67 

68 snap_get_all_for_snap_instance = self.mock_object( 

69 db, 'share_snapshot_access_get_all_for_snapshot_instance', 

70 mock.Mock(return_value=all_rules)) 

71 

72 self.mock_object(db, 'share_snapshot_instance_access_update') 

73 self.mock_object(self.driver, 'snapshot_update_access') 

74 self.mock_object(self.snapshot_access, '_check_needs_refresh', 

75 mock.Mock(return_value=False)) 

76 self.mock_object(db, 'share_snapshot_instance_access_delete') 

77 

78 self.snapshot_access.update_access_rules(self.context, 

79 self.snapshot_instance['id']) 

80 

81 snapshot_instance_get.assert_called_once_with( 

82 utils.IsAMatcher(context.RequestContext), 

83 self.snapshot_instance['id'], with_share_data=True) 

84 snap_get_all_for_snap_instance.assert_called_once_with( 

85 utils.IsAMatcher(context.RequestContext), 

86 self.snapshot_instance['id']) 

87 if state == constants.ACCESS_STATE_QUEUED_TO_APPLY: 

88 self.driver.snapshot_update_access.assert_called_once_with( 

89 utils.IsAMatcher(context.RequestContext), 

90 self.snapshot_instance, rules, add_rules=rules, 

91 delete_rules=[], share_server=None) 

92 else: 

93 self.driver.snapshot_update_access.assert_called_once_with( 

94 utils.IsAMatcher(context.RequestContext), 

95 self.snapshot_instance, [], add_rules=[], 

96 delete_rules=rules, share_server=None) 

97 

98 def test_update_access_rules_delete_all_rules(self): 

99 

100 rules = [] 

101 for i in range(2): 

102 rules.append({ 

103 'id': 'id-%s' % i, 

104 'state': constants.ACCESS_STATE_QUEUED_TO_DENY, 

105 'access_id': 'rule_id%s' % i 

106 }) 

107 

108 snapshot_instance_get = self.mock_object( 

109 db, 'share_snapshot_instance_get', 

110 mock.Mock(return_value=self.snapshot_instance)) 

111 

112 snap_get_all_for_snap_instance = self.mock_object( 

113 db, 'share_snapshot_access_get_all_for_snapshot_instance', 

114 mock.Mock(side_effect=[rules, []])) 

115 

116 self.mock_object(db, 'share_snapshot_instance_access_update') 

117 self.mock_object(self.driver, 'snapshot_update_access') 

118 self.mock_object(db, 'share_snapshot_instance_access_delete') 

119 

120 self.snapshot_access.update_access_rules(self.context, 

121 self.snapshot_instance['id'], 

122 delete_all_rules=True) 

123 

124 snapshot_instance_get.assert_called_once_with( 

125 utils.IsAMatcher(context.RequestContext), 

126 self.snapshot_instance['id'], with_share_data=True) 

127 snap_get_all_for_snap_instance.assert_called_with( 

128 utils.IsAMatcher(context.RequestContext), 

129 self.snapshot_instance['id']) 

130 self.driver.snapshot_update_access.assert_called_with( 

131 utils.IsAMatcher(context.RequestContext), self.snapshot_instance, 

132 [], add_rules=[], delete_rules=rules, share_server=None) 

133 

134 def test_update_access_rules_exception(self): 

135 

136 rules = [] 

137 for i in range(2): 

138 rules.append({ 

139 'id': 'id-%s' % i, 

140 'state': constants.ACCESS_STATE_APPLYING, 

141 'access_id': 'rule_id%s' % i 

142 }) 

143 

144 snapshot_instance_get = self.mock_object( 

145 db, 'share_snapshot_instance_get', 

146 mock.Mock(return_value=self.snapshot_instance)) 

147 

148 snap_get_all_for_snap_instance = self.mock_object( 

149 db, 'share_snapshot_access_get_all_for_snapshot_instance', 

150 mock.Mock(return_value=rules)) 

151 

152 self.mock_object(db, 'share_snapshot_instance_access_update') 

153 self.mock_object(self.driver, 'snapshot_update_access', 

154 mock.Mock(side_effect=exception.NotFound)) 

155 

156 self.assertRaises(exception.NotFound, 

157 self.snapshot_access.update_access_rules, 

158 self.context, self.snapshot_instance['id']) 

159 

160 snapshot_instance_get.assert_called_once_with( 

161 utils.IsAMatcher(context.RequestContext), 

162 self.snapshot_instance['id'], with_share_data=True) 

163 snap_get_all_for_snap_instance.assert_called_once_with( 

164 utils.IsAMatcher(context.RequestContext), 

165 self.snapshot_instance['id']) 

166 

167 self.driver.snapshot_update_access.assert_called_once_with( 

168 utils.IsAMatcher(context.RequestContext), self.snapshot_instance, 

169 rules, add_rules=rules, delete_rules=[], share_server=None)