Coverage for manila/tests/api/v2/test_metadata.py: 100%

89 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Licensed under the Apache License, Version 2.0 (the "License"); you may 

2# not use this file except in compliance with the License. You may obtain 

3# a copy of the License at 

4# 

5# http://www.apache.org/licenses/LICENSE-2.0 

6# 

7# Unless required by applicable law or agreed to in writing, software 

8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

10# License for the specific language governing permissions and limitations 

11# under the License. 

12 

13from unittest import mock 

14 

15import ddt 

16import webob 

17 

18from manila.api.v2 import metadata 

19from manila import context 

20from manila import exception 

21from manila import policy 

22from manila import test 

23from manila.tests.api import fakes 

24from manila.tests import db_utils 

25 

26 

27@ddt.ddt 

28class MetadataAPITest(test.TestCase): 

29 

30 def _get_request(self, version="2.65", use_admin_context=False): 

31 req = fakes.HTTPRequest.blank( 

32 '/v2/shares/{resource_id}/metadata', 

33 version=version, use_admin_context=use_admin_context) 

34 return req 

35 

36 def setUp(self): 

37 super(MetadataAPITest, self).setUp() 

38 self.controller = ( 

39 metadata.MetadataController()) 

40 self.controller.resource_name = 'share' 

41 self.mock_policy_check = self.mock_object( 

42 policy, 'check_policy', mock.Mock(return_value=True)) 

43 self.resource = db_utils.create_share(size=1) 

44 

45 def test__get_resource_policy_not_authorized_pubic_resource(self): 

46 fake_context = context.RequestContext('fake', 'fake', True) 

47 policy_exception = exception.PolicyNotAuthorized(action='share:get') 

48 mock_policy_check = self.mock_object( 

49 policy, 'check_policy', mock.Mock(side_effect=policy_exception)) 

50 share_obj = db_utils.create_share(size=1, is_public=True) 

51 

52 self.assertRaises( 

53 exception.PolicyNotAuthorized, 

54 self.controller._get_resource, 

55 fake_context, 

56 share_obj['id'], 

57 for_modification=True, 

58 ) 

59 

60 mock_policy_check.assert_called_once_with( 

61 fake_context, 'share', 'get', mock.ANY) 

62 policy_resource_obj = mock_policy_check.call_args[0][3] 

63 self.assertEqual(share_obj['id'], policy_resource_obj['id']) 

64 

65 @ddt.data(True, False) 

66 def test__get_resource_policy_not_authorized_private_resource(self, formd): 

67 fake_context = context.RequestContext('fake', 'fake', True) 

68 mock_policy_check = self.mock_object( 

69 policy, 'check_policy', mock.Mock(return_value=False)) 

70 share_obj = db_utils.create_share(size=1, is_public=False) 

71 

72 self.assertRaises( 

73 webob.exc.HTTPNotFound, 

74 self.controller._get_resource, 

75 fake_context, 

76 share_obj['id'], 

77 for_modification=formd, 

78 ) 

79 mock_policy_check.assert_called_once_with( 

80 fake_context, 'share', 'get', mock.ANY, do_raise=False) 

81 policy_resource_obj = mock_policy_check.call_args[0][3] 

82 self.assertEqual(share_obj['id'], policy_resource_obj['id']) 

83 

84 def test_create_index_metadata(self): 

85 url = self._get_request() 

86 body = {'metadata': {'test_key1': 'test_v1', 'test_key2': 'test_v2'}} 

87 update = self.controller._create_metadata( 

88 url, self.resource['id'], body=body) 

89 

90 get = self.controller._index_metadata(url, self.resource['id']) 

91 

92 self.assertEqual(2, len(get['metadata'])) 

93 self.assertEqual(update['metadata'], get['metadata']) 

94 

95 @ddt.data(({'metadata': {'key1': 'v1'}}, 'key1'), 

96 ({'metadata': {'test_key1': 'test_v1'}}, 'test_key1'), 

97 ({'metadata': {'key1': 'v2'}}, 'key1')) 

98 @ddt.unpack 

99 def test_update_get_metadata_item(self, body, key): 

100 url = self._get_request() 

101 update = self.controller._update_metadata_item( 

102 url, self.resource['id'], body=body, key=key) 

103 self.assertEqual(body, update) 

104 

105 get = self.controller._index_metadata(url, self.resource['id']) 

106 

107 self.assertEqual(1, len(get)) 

108 self.assertEqual(body['metadata'], get['metadata']) 

109 

110 get_item = self.controller._show_metadata(url, self.resource['id'], 

111 key=key) 

112 self.assertEqual({'meta': body['metadata']}, get_item) 

113 

114 @ddt.data({'metadata': {'key1': 'v1', 'key2': 'v2'}}, 

115 {'metadata': {'test_key1': 'test_v1'}}, 

116 {'metadata': {'key1': 'v2'}}) 

117 def test_update_all_metadata(self, body): 

118 url = self._get_request() 

119 update = self.controller._update_all_metadata( 

120 url, self.resource['id'], body=body) 

121 self.assertEqual(body, update) 

122 

123 get = self.controller._index_metadata(url, self.resource['id']) 

124 

125 self.assertEqual(len(body['metadata']), len(get['metadata'])) 

126 self.assertEqual(body['metadata'], get['metadata']) 

127 

128 def test_delete_metadata(self): 

129 body = {'metadata': {'test_key3': 'test_v3', 'testkey': 'testval'}} 

130 url = self._get_request() 

131 self.controller._create_metadata(url, self.resource['id'], body=body) 

132 

133 self.controller._delete_metadata(url, self.resource['id'], 

134 'test_key3') 

135 show_result = self.controller._index_metadata(url, self.resource[ 

136 'id']) 

137 

138 self.assertEqual(1, len(show_result['metadata'])) 

139 self.assertNotIn('test_key3', show_result['metadata']) 

140 

141 def test_update_metadata_with_resource_id_not_found(self): 

142 url = self._get_request() 

143 id = 'invalid_id' 

144 body = {'metadata': {'key1': 'v1'}} 

145 

146 self.assertRaises( 

147 webob.exc.HTTPNotFound, 

148 self.controller._create_metadata, 

149 url, id, body) 

150 

151 def test_update_metadata_with_body_error(self): 

152 self.assertRaises( 

153 webob.exc.HTTPBadRequest, 

154 self.controller._create_metadata, 

155 self._get_request(), self.resource['id'], 

156 {'metadata_error': {'key1': 'v1'}}) 

157 

158 @ddt.data({'metadata': {'key1': 'v1', 'key2': None}}, 

159 {'metadata': {None: 'v1', 'key2': 'v2'}}, 

160 {'metadata': {'k' * 256: 'v2'}}, 

161 {'metadata': {'key1': 'v' * 1024}}) 

162 @ddt.unpack 

163 def test_update_metadata_with_invalid_metadata(self, metadata): 

164 self.assertRaises( 

165 webob.exc.HTTPBadRequest, 

166 self.controller._create_metadata, 

167 self._get_request(), self.resource['id'], 

168 {'metadata': metadata}) 

169 

170 def test_delete_metadata_not_found(self): 

171 body = {'metadata': {'test_key_exist': 'test_v_exist'}} 

172 update = self.controller._update_all_metadata( 

173 self._get_request(), self.resource['id'], body=body) 

174 self.assertEqual(body, update) 

175 self.assertRaises( 

176 exception.MetadataItemNotFound, 

177 self.controller._delete_metadata, 

178 self._get_request(), self.resource['id'], 'key1')