Coverage for manila/tests/api/v2/test_metadata.py: 100%
89 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Licensed under the Apache License, Version 2.0 (the "License"); you may
2# not use this file except in compliance with the License. You may obtain
3# a copy of the License at
4#
5# http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10# License for the specific language governing permissions and limitations
11# under the License.
13from unittest import mock
15import ddt
16import webob
18from manila.api.v2 import metadata
19from manila import context
20from manila import exception
21from manila import policy
22from manila import test
23from manila.tests.api import fakes
24from manila.tests import db_utils
27@ddt.ddt
28class MetadataAPITest(test.TestCase):
30 def _get_request(self, version="2.65", use_admin_context=False):
31 req = fakes.HTTPRequest.blank(
32 '/v2/shares/{resource_id}/metadata',
33 version=version, use_admin_context=use_admin_context)
34 return req
36 def setUp(self):
37 super(MetadataAPITest, self).setUp()
38 self.controller = (
39 metadata.MetadataController())
40 self.controller.resource_name = 'share'
41 self.mock_policy_check = self.mock_object(
42 policy, 'check_policy', mock.Mock(return_value=True))
43 self.resource = db_utils.create_share(size=1)
45 def test__get_resource_policy_not_authorized_pubic_resource(self):
46 fake_context = context.RequestContext('fake', 'fake', True)
47 policy_exception = exception.PolicyNotAuthorized(action='share:get')
48 mock_policy_check = self.mock_object(
49 policy, 'check_policy', mock.Mock(side_effect=policy_exception))
50 share_obj = db_utils.create_share(size=1, is_public=True)
52 self.assertRaises(
53 exception.PolicyNotAuthorized,
54 self.controller._get_resource,
55 fake_context,
56 share_obj['id'],
57 for_modification=True,
58 )
60 mock_policy_check.assert_called_once_with(
61 fake_context, 'share', 'get', mock.ANY)
62 policy_resource_obj = mock_policy_check.call_args[0][3]
63 self.assertEqual(share_obj['id'], policy_resource_obj['id'])
65 @ddt.data(True, False)
66 def test__get_resource_policy_not_authorized_private_resource(self, formd):
67 fake_context = context.RequestContext('fake', 'fake', True)
68 mock_policy_check = self.mock_object(
69 policy, 'check_policy', mock.Mock(return_value=False))
70 share_obj = db_utils.create_share(size=1, is_public=False)
72 self.assertRaises(
73 webob.exc.HTTPNotFound,
74 self.controller._get_resource,
75 fake_context,
76 share_obj['id'],
77 for_modification=formd,
78 )
79 mock_policy_check.assert_called_once_with(
80 fake_context, 'share', 'get', mock.ANY, do_raise=False)
81 policy_resource_obj = mock_policy_check.call_args[0][3]
82 self.assertEqual(share_obj['id'], policy_resource_obj['id'])
84 def test_create_index_metadata(self):
85 url = self._get_request()
86 body = {'metadata': {'test_key1': 'test_v1', 'test_key2': 'test_v2'}}
87 update = self.controller._create_metadata(
88 url, self.resource['id'], body=body)
90 get = self.controller._index_metadata(url, self.resource['id'])
92 self.assertEqual(2, len(get['metadata']))
93 self.assertEqual(update['metadata'], get['metadata'])
95 @ddt.data(({'metadata': {'key1': 'v1'}}, 'key1'),
96 ({'metadata': {'test_key1': 'test_v1'}}, 'test_key1'),
97 ({'metadata': {'key1': 'v2'}}, 'key1'))
98 @ddt.unpack
99 def test_update_get_metadata_item(self, body, key):
100 url = self._get_request()
101 update = self.controller._update_metadata_item(
102 url, self.resource['id'], body=body, key=key)
103 self.assertEqual(body, update)
105 get = self.controller._index_metadata(url, self.resource['id'])
107 self.assertEqual(1, len(get))
108 self.assertEqual(body['metadata'], get['metadata'])
110 get_item = self.controller._show_metadata(url, self.resource['id'],
111 key=key)
112 self.assertEqual({'meta': body['metadata']}, get_item)
114 @ddt.data({'metadata': {'key1': 'v1', 'key2': 'v2'}},
115 {'metadata': {'test_key1': 'test_v1'}},
116 {'metadata': {'key1': 'v2'}})
117 def test_update_all_metadata(self, body):
118 url = self._get_request()
119 update = self.controller._update_all_metadata(
120 url, self.resource['id'], body=body)
121 self.assertEqual(body, update)
123 get = self.controller._index_metadata(url, self.resource['id'])
125 self.assertEqual(len(body['metadata']), len(get['metadata']))
126 self.assertEqual(body['metadata'], get['metadata'])
128 def test_delete_metadata(self):
129 body = {'metadata': {'test_key3': 'test_v3', 'testkey': 'testval'}}
130 url = self._get_request()
131 self.controller._create_metadata(url, self.resource['id'], body=body)
133 self.controller._delete_metadata(url, self.resource['id'],
134 'test_key3')
135 show_result = self.controller._index_metadata(url, self.resource[
136 'id'])
138 self.assertEqual(1, len(show_result['metadata']))
139 self.assertNotIn('test_key3', show_result['metadata'])
141 def test_update_metadata_with_resource_id_not_found(self):
142 url = self._get_request()
143 id = 'invalid_id'
144 body = {'metadata': {'key1': 'v1'}}
146 self.assertRaises(
147 webob.exc.HTTPNotFound,
148 self.controller._create_metadata,
149 url, id, body)
151 def test_update_metadata_with_body_error(self):
152 self.assertRaises(
153 webob.exc.HTTPBadRequest,
154 self.controller._create_metadata,
155 self._get_request(), self.resource['id'],
156 {'metadata_error': {'key1': 'v1'}})
158 @ddt.data({'metadata': {'key1': 'v1', 'key2': None}},
159 {'metadata': {None: 'v1', 'key2': 'v2'}},
160 {'metadata': {'k' * 256: 'v2'}},
161 {'metadata': {'key1': 'v' * 1024}})
162 @ddt.unpack
163 def test_update_metadata_with_invalid_metadata(self, metadata):
164 self.assertRaises(
165 webob.exc.HTTPBadRequest,
166 self.controller._create_metadata,
167 self._get_request(), self.resource['id'],
168 {'metadata': metadata})
170 def test_delete_metadata_not_found(self):
171 body = {'metadata': {'test_key_exist': 'test_v_exist'}}
172 update = self.controller._update_all_metadata(
173 self._get_request(), self.resource['id'], body=body)
174 self.assertEqual(body, update)
175 self.assertRaises(
176 exception.MetadataItemNotFound,
177 self.controller._delete_metadata,
178 self._get_request(), self.resource['id'], 'key1')