Coverage for manila/tests/share/drivers/nexenta/ns5/test_nexenta_nas.py: 98%
310 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2019 Nexenta by DDN, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16from unittest import mock
18import ddt
19from oslo_utils import units
21from manila import context
22from manila.share.drivers.nexenta.ns5 import jsonrpc
23from manila.share.drivers.nexenta.ns5 import nexenta_nas
24from manila import test
26RPC_PATH = 'manila.share.drivers.nexenta.ns5.jsonrpc'
27DRV_PATH = 'manila.share.drivers.nexenta.ns5.nexenta_nas.NexentaNasDriver'
28DRIVER_VERSION = '1.1'
29SHARE = {'share_id': 'uuid', 'size': 1, 'share_proto': 'NFS'}
30SHARE_PATH = 'pool1/nfs_share/share-uuid'
31SHARE2 = {'share_id': 'uuid2', 'size': 2, 'share_proto': 'NFS'}
32SHARE2_PATH = 'pool1/nfs_share/share-uuid2'
33SNAPSHOT = {
34 'snapshot_id': 'snap_id',
35 'share': SHARE,
36 'snapshot_path': '%s@%s' % (SHARE_PATH, 'snapshot-snap_id')}
39@ddt.ddt
40class TestNexentaNasDriver(test.TestCase):
42 def setUp(self):
43 def _safe_get(opt):
44 return getattr(self.cfg, opt)
45 self.cfg = mock.Mock()
46 self.mock_object(
47 self.cfg, 'safe_get', mock.Mock(side_effect=_safe_get))
48 super(TestNexentaNasDriver, self).setUp()
49 self.cfg.nexenta_nas_host = '1.1.1.1'
50 self.cfg.nexenta_rest_addresses = ['2.2.2.2']
51 self.ctx = context.get_admin_context()
52 self.cfg.nexenta_rest_port = 8080
53 self.cfg.nexenta_rest_protocol = 'auto'
54 self.cfg.nexenta_pool = 'pool1'
55 self.cfg.nexenta_dataset_record_size = 131072
56 self.cfg.reserved_share_percentage = 0
57 self.cfg.reserved_share_from_snapshot_percentage = 0
58 self.cfg.reserved_share_extend_percentage = 0
59 self.cfg.nexenta_folder = 'nfs_share'
60 self.cfg.nexenta_user = 'user'
61 self.cfg.share_backend_name = 'NexentaStor5'
62 self.cfg.nexenta_password = 'password'
63 self.cfg.nexenta_thin_provisioning = False
64 self.cfg.nexenta_mount_point_base = 'mnt'
65 self.cfg.nexenta_rest_retry_count = 3
66 self.cfg.nexenta_share_name_prefix = 'share-'
67 self.cfg.max_over_subscription_ratio = 20.0
68 self.cfg.enabled_share_protocols = 'NFS'
69 self.cfg.nexenta_mount_point_base = '$state_path/mnt'
70 self.cfg.nexenta_dataset_compression = 'on'
71 self.cfg.network_config_group = 'DEFAULT'
72 self.cfg.admin_network_config_group = (
73 'fake_admin_network_config_group')
74 self.cfg.driver_handles_share_servers = False
75 self.cfg.safe_get = self.fake_safe_get
76 self.nef_mock = mock.Mock()
77 self.mock_object(jsonrpc, 'NefRequest')
78 self.drv = nexenta_nas.NexentaNasDriver(configuration=self.cfg)
79 self.drv.do_setup(self.ctx)
81 def fake_safe_get(self, key):
82 try:
83 value = getattr(self.cfg, key)
84 except AttributeError:
85 value = None
86 return value
88 def test_backend_name(self):
89 self.assertEqual('NexentaStor5', self.drv.share_backend_name)
91 @mock.patch('%s._get_provisioned_capacity' % DRV_PATH)
92 @mock.patch('manila.share.drivers.nexenta.ns5.'
93 'jsonrpc.NefServices.get')
94 @mock.patch('manila.share.drivers.nexenta.ns5.'
95 'jsonrpc.NefFilesystems.set')
96 @mock.patch('manila.share.drivers.nexenta.ns5.'
97 'jsonrpc.NefFilesystems.get')
98 def test_check_for_setup_error(self, get_filesystem, set_filesystem,
99 get_service, prov_capacity):
100 prov_capacity.return_value = 1
101 get_filesystem.return_value = {
102 'mountPoint': '/path/to/volume',
103 'nonBlockingMandatoryMode': False,
104 'smartCompression': False,
105 'isMounted': True
106 }
107 get_service.return_value = {
108 'state': 'online'
109 }
110 self.assertIsNone(self.drv.check_for_setup_error())
111 get_filesystem.assert_called_with(self.drv.root_path)
112 set_filesystem.assert_not_called()
113 get_service.assert_called_with('nfs')
114 get_filesystem.return_value = {
115 'mountPoint': '/path/to/volume',
116 'nonBlockingMandatoryMode': True,
117 'smartCompression': True,
118 'isMounted': True
119 }
120 set_filesystem.return_value = {}
121 payload = {
122 'nonBlockingMandatoryMode': False,
123 'smartCompression': False
124 }
125 self.assertIsNone(self.drv.check_for_setup_error())
126 get_filesystem.assert_called_with(self.drv.root_path)
127 set_filesystem.assert_called_with(self.drv.root_path, payload)
128 get_service.assert_called_with('nfs')
129 get_filesystem.return_value = {
130 'mountPoint': '/path/to/volume',
131 'nonBlockingMandatoryMode': False,
132 'smartCompression': True,
133 'isMounted': True
134 }
135 payload = {
136 'smartCompression': False
137 }
138 set_filesystem.return_value = {}
139 self.assertIsNone(self.drv.check_for_setup_error())
140 get_filesystem.assert_called_with(self.drv.root_path)
141 set_filesystem.assert_called_with(self.drv.root_path, payload)
142 get_service.assert_called_with('nfs')
143 get_filesystem.return_value = {
144 'mountPoint': '/path/to/volume',
145 'nonBlockingMandatoryMode': True,
146 'smartCompression': False,
147 'isMounted': True
148 }
149 payload = {
150 'nonBlockingMandatoryMode': False
151 }
152 set_filesystem.return_value = {}
153 self.assertIsNone(self.drv.check_for_setup_error())
154 get_filesystem.assert_called_with(self.drv.root_path)
155 set_filesystem.assert_called_with(self.drv.root_path, payload)
156 get_service.assert_called_with('nfs')
157 get_filesystem.return_value = {
158 'mountPoint': 'none',
159 'nonBlockingMandatoryMode': False,
160 'smartCompression': False,
161 'isMounted': False
162 }
163 self.assertRaises(jsonrpc.NefException,
164 self.drv.check_for_setup_error)
165 get_filesystem.return_value = {
166 'mountPoint': '/path/to/volume',
167 'nonBlockingMandatoryMode': False,
168 'smartCompression': False,
169 'isMounted': False
170 }
171 self.assertRaises(jsonrpc.NefException,
172 self.drv.check_for_setup_error)
173 get_service.return_value = {
174 'state': 'online'
175 }
176 self.assertRaises(jsonrpc.NefException,
177 self.drv.check_for_setup_error)
179 @mock.patch('%s.NefFilesystems.get' % RPC_PATH)
180 def test__get_provisioned_capacity(self, fs_get):
181 fs_get.return_value = {
182 'path': 'pool1/nfs_share/123',
183 'referencedQuotaSize': 1 * units.Gi
184 }
186 self.drv._get_provisioned_capacity()
188 self.assertEqual(1 * units.Gi, self.drv.provisioned_capacity)
190 @mock.patch('%s._mount_filesystem' % DRV_PATH)
191 @mock.patch('%s.NefFilesystems.create' % RPC_PATH)
192 @mock.patch('%s.NefFilesystems.delete' % RPC_PATH)
193 def test_create_share(self, delete_fs, create_fs, mount_fs):
194 mount_path = '%s:/%s' % (self.cfg.nexenta_nas_host, SHARE_PATH)
195 mount_fs.return_value = mount_path
196 size = int(1 * units.Gi * 1.1)
197 self.assertEqual(
198 [{
199 'path': mount_path,
200 'id': 'share-uuid'
201 }],
202 self.drv.create_share(self.ctx, SHARE))
204 payload = {
205 'recordSize': 131072,
206 'compressionMode': self.cfg.nexenta_dataset_compression,
207 'path': SHARE_PATH,
208 'referencedQuotaSize': size,
209 'nonBlockingMandatoryMode': False,
210 'referencedReservationSize': size
211 }
212 self.drv.nef.filesystems.create.assert_called_with(payload)
214 mount_fs.side_effect = jsonrpc.NefException('some error')
215 self.assertRaises(jsonrpc.NefException,
216 self.drv.create_share, self.ctx, SHARE)
217 delete_payload = {'force': True}
218 self.drv.nef.filesystems.delete.assert_called_with(
219 SHARE_PATH, delete_payload)
221 @mock.patch('%s.NefFilesystems.promote' % RPC_PATH)
222 @mock.patch('%s.NefSnapshots.get' % RPC_PATH)
223 @mock.patch('%s.NefSnapshots.list' % RPC_PATH)
224 @mock.patch('%s.NefFilesystems.delete' % RPC_PATH)
225 def test_delete_share(self, fs_delete, snap_list, snap_get, fs_promote):
226 delete_payload = {'force': True, 'snapshots': True}
227 snapshots_payload = {'parent': SHARE_PATH, 'fields': 'path'}
228 clones_payload = {'fields': 'clones,creationTxg'}
229 clone_path = '%s:/%s' % (self.cfg.nexenta_nas_host, 'path_to_fs')
230 fs_delete.side_effect = [
231 jsonrpc.NefException({
232 'message': 'some_error',
233 'code': 'EEXIST'}),
234 None]
235 snap_list.return_value = [{'path': '%s@snap1' % SHARE_PATH}]
236 snap_get.return_value = {'clones': [clone_path], 'creationTxg': 1}
237 self.assertIsNone(self.drv.delete_share(self.ctx, SHARE))
238 fs_delete.assert_called_with(SHARE_PATH, delete_payload)
239 fs_promote.assert_called_with(clone_path)
240 snap_get.assert_called_with('%s@snap1' % SHARE_PATH, clones_payload)
241 snap_list.assert_called_with(snapshots_payload)
243 @mock.patch('%s.NefFilesystems.mount' % RPC_PATH)
244 @mock.patch('%s.NefFilesystems.get' % RPC_PATH)
245 def test_mount_filesystem(self, fs_get, fs_mount):
246 mount_path = '%s:/%s' % (self.cfg.nexenta_nas_host, SHARE_PATH)
247 fs_get.return_value = {
248 'mountPoint': '/%s' % SHARE_PATH, 'isMounted': False}
249 self.assertEqual(mount_path, self.drv._mount_filesystem(SHARE))
250 self.drv.nef.filesystems.mount.assert_called_with(SHARE_PATH)
252 @mock.patch('%s.NefHpr.activate' % RPC_PATH)
253 @mock.patch('%s.NefFilesystems.mount' % RPC_PATH)
254 @mock.patch('%s.NefFilesystems.get' % RPC_PATH)
255 def test_mount_filesystem_with_activate(
256 self, fs_get, fs_mount, hpr_activate):
257 mount_path = '%s:/%s' % (self.cfg.nexenta_nas_host, SHARE_PATH)
258 fs_get.side_effect = [
259 {'mountPoint': 'none', 'isMounted': False},
260 {'mountPoint': '/%s' % SHARE_PATH, 'isMounted': False}]
261 self.assertEqual(mount_path, self.drv._mount_filesystem(SHARE))
262 payload = {'datasetName': SHARE_PATH}
263 self.drv.nef.hpr.activate.assert_called_once_with(payload)
265 @mock.patch('%s.NefFilesystems.mount' % RPC_PATH)
266 @mock.patch('%s.NefFilesystems.unmount' % RPC_PATH)
267 def test_remount_filesystem(self, fs_unmount, fs_mount):
268 self.drv._remount_filesystem(SHARE_PATH)
269 fs_unmount.assert_called_once_with(SHARE_PATH)
270 fs_mount.assert_called_once_with(SHARE_PATH)
272 def parse_fqdn(self, fqdn):
273 address_mask = fqdn.strip().split('/', 1)
274 address = address_mask[0]
275 ls = {"allow": True, "etype": "fqdn", "entity": address}
276 if len(address_mask) == 2:
277 ls['mask'] = address_mask[1]
278 ls['etype'] = 'network'
279 return ls
281 @ddt.data({'key': 'value'}, {})
282 @mock.patch('%s.NefNfs.list' % RPC_PATH)
283 @mock.patch('%s.NefNfs.set' % RPC_PATH)
284 @mock.patch('%s.NefFilesystems.acl' % RPC_PATH)
285 def test_update_nfs_access(self, acl, nfs_set, nfs_list, list_data):
286 security_contexts = {'securityModes': ['sys']}
287 nfs_list.return_value = list_data
288 rw_list = ['1.1.1.1/24', '2.2.2.2']
289 ro_list = ['3.3.3.3', '4.4.4.4/30']
290 security_contexts['readWriteList'] = []
291 security_contexts['readOnlyList'] = []
292 for fqdn in rw_list:
293 ls = self.parse_fqdn(fqdn)
294 if ls.get('mask'):
295 ls['mask'] = int(ls['mask'])
296 security_contexts['readWriteList'].append(ls)
297 for fqdn in ro_list:
298 ls = self.parse_fqdn(fqdn)
299 if ls.get('mask'):
300 ls['mask'] = int(ls['mask'])
301 security_contexts['readOnlyList'].append(ls)
303 self.assertIsNone(self.drv._update_nfs_access(SHARE, rw_list, ro_list))
304 payload = {
305 'flags': ['file_inherit', 'dir_inherit'],
306 'permissions': ['full_set'],
307 'principal': 'everyone@',
308 'type': 'allow'
309 }
310 self.drv.nef.filesystems.acl.assert_called_with(SHARE_PATH, payload)
311 payload = {'securityContexts': [security_contexts]}
312 if list_data: 312 ↛ 315line 312 didn't jump to line 315 because the condition on line 312 was always true
313 self.drv.nef.nfs.set.assert_called_with(SHARE_PATH, payload)
314 else:
315 payload['filesystem'] = SHARE_PATH
316 self.drv.nef.nfs.create.assert_called_with(payload)
318 def test_update_nfs_access_bad_mask(self):
319 security_contexts = {'securityModes': ['sys']}
320 rw_list = ['1.1.1.1/24', '2.2.2.2/1a']
321 ro_list = ['3.3.3.3', '4.4.4.4/30']
322 security_contexts['readWriteList'] = []
323 security_contexts['readOnlyList'] = []
324 for fqdn in rw_list:
325 security_contexts['readWriteList'].append(self.parse_fqdn(fqdn))
326 for fqdn in ro_list:
327 security_contexts['readOnlyList'].append(self.parse_fqdn(fqdn))
329 self.assertRaises(ValueError, self.drv._update_nfs_access,
330 SHARE, rw_list, ro_list)
332 @mock.patch('%s._update_nfs_access' % DRV_PATH)
333 def test_update_access__ip_rw(self, update_nfs_access):
334 access = {
335 'access_type': 'ip',
336 'access_to': '1.1.1.1',
337 'access_level': 'rw',
338 'access_id': 'fake_id'
339 }
341 self.assertEqual(
342 {'fake_id': {'state': 'active'}},
343 self.drv.update_access(
344 self.ctx, SHARE, [access], None, None, None))
345 self.drv._update_nfs_access.assert_called_with(SHARE, ['1.1.1.1'], [])
347 @mock.patch('%s._update_nfs_access' % DRV_PATH)
348 def test_update_access__ip_ro(self, update_nfs_access):
349 access = {
350 'access_type': 'ip',
351 'access_to': '1.1.1.1',
352 'access_level': 'ro',
353 'access_id': 'fake_id'
354 }
356 expected = {'fake_id': {'state': 'active'}}
357 self.assertEqual(
358 expected, self.drv.update_access(
359 self.ctx, SHARE, [access], None, None, None))
360 self.drv._update_nfs_access.assert_called_with(SHARE, [], ['1.1.1.1'])
362 @ddt.data('rw', 'ro')
363 def test_update_access__not_ip(self, access_level):
364 access = {
365 'access_type': 'username',
366 'access_to': 'some_user',
367 'access_level': access_level,
368 'access_id': 'fake_id'
369 }
370 expected = {'fake_id': {'state': 'error'}}
371 self.assertEqual(expected, self.drv.update_access(
372 self.ctx, SHARE, [access], None, None, None))
374 @mock.patch('%s._get_capacity_info' % DRV_PATH)
375 @mock.patch('manila.share.driver.ShareDriver._update_share_stats')
376 def test_update_share_stats(self, super_stats, info):
377 info.return_value = (100, 90, 10)
378 stats = {
379 'vendor_name': 'Nexenta',
380 'storage_protocol': 'NFS',
381 'nfs_mount_point_base': self.cfg.nexenta_mount_point_base,
382 'create_share_from_snapshot_support': True,
383 'revert_to_snapshot_support': True,
384 'snapshot_support': True,
385 'driver_version': DRIVER_VERSION,
386 'share_backend_name': self.cfg.share_backend_name,
387 'pools': [{
388 'compression': True,
389 'pool_name': 'pool1',
390 'total_capacity_gb': 100,
391 'free_capacity_gb': 90,
392 'provisioned_capacity_gb': 0,
393 'max_over_subscription_ratio': 20.0,
394 'reserved_percentage': (
395 self.cfg.reserved_share_percentage),
396 'reserved_snapshot_percentage': (
397 self.cfg.reserved_share_from_snapshot_percentage),
398 'reserved_share_extend_percentage': (
399 self.cfg.reserved_share_extend_percentage),
400 'thin_provisioning': self.cfg.nexenta_thin_provisioning,
401 }],
402 }
404 self.drv._update_share_stats()
406 self.assertEqual(stats, self.drv._stats)
408 def test_get_capacity_info(self):
409 self.drv.nef.get.return_value = {
410 'bytesAvailable': 9 * units.Gi, 'bytesUsed': 1 * units.Gi}
412 self.assertEqual((10, 9, 1), self.drv._get_capacity_info())
414 @mock.patch('%s._set_reservation' % DRV_PATH)
415 @mock.patch('%s._set_quota' % DRV_PATH)
416 @mock.patch('%s.NefFilesystems.rename' % RPC_PATH)
417 @mock.patch('%s.NefFilesystems.get' % RPC_PATH)
418 def test_manage_existing(self, fs_get, fs_rename, set_res, set_quota):
419 fs_get.return_value = {'referencedQuotaSize': 1073741824}
420 old_path = '%s:/%s' % (self.cfg.nexenta_nas_host, 'path_to_fs')
421 new_path = '%s:/%s' % (self.cfg.nexenta_nas_host, SHARE_PATH)
422 SHARE['export_locations'] = [{'path': old_path}]
423 expected = {'size': 2, 'export_locations': [{
424 'path': new_path
425 }]}
426 self.assertEqual(expected, self.drv.manage_existing(SHARE, None))
427 fs_rename.assert_called_with('path_to_fs', {'newPath': SHARE_PATH})
428 set_res.assert_called_with(SHARE, 2)
429 set_quota.assert_called_with(SHARE, 2)
431 @mock.patch('%s.NefSnapshots.create' % RPC_PATH)
432 def test_create_snapshot(self, snap_create):
433 self.assertIsNone(self.drv.create_snapshot(self.ctx, SNAPSHOT))
434 snap_create.assert_called_once_with({
435 'path': SNAPSHOT['snapshot_path']})
437 @mock.patch('%s.NefSnapshots.delete' % RPC_PATH)
438 def test_delete_snapshot(self, snap_delete):
439 self.assertIsNone(self.drv.delete_snapshot(self.ctx, SNAPSHOT))
440 payload = {'defer': True}
441 snap_delete.assert_called_once_with(
442 SNAPSHOT['snapshot_path'], payload)
444 @mock.patch('%s._mount_filesystem' % DRV_PATH)
445 @mock.patch('%s._remount_filesystem' % DRV_PATH)
446 @mock.patch('%s.NefFilesystems.delete' % RPC_PATH)
447 @mock.patch('%s.NefSnapshots.clone' % RPC_PATH)
448 def test_create_share_from_snapshot(
449 self, snap_clone, fs_delete, remount_fs, mount_fs):
450 mount_fs.return_value = 'mount_path'
451 location = {
452 'path': 'mount_path',
453 'id': 'share-uuid2'
454 }
455 self.assertEqual([location], self.drv.create_share_from_snapshot(
456 self.ctx, SHARE2, SNAPSHOT))
458 size = int(SHARE2['size'] * units.Gi * 1.1)
459 payload = {
460 'targetPath': SHARE2_PATH,
461 'referencedQuotaSize': size,
462 'recordSize': self.cfg.nexenta_dataset_record_size,
463 'compressionMode': self.cfg.nexenta_dataset_compression,
464 'nonBlockingMandatoryMode': False,
465 'referencedReservationSize': size
466 }
467 snap_clone.assert_called_once_with(SNAPSHOT['snapshot_path'], payload)
469 @mock.patch('%s._mount_filesystem' % DRV_PATH)
470 @mock.patch('%s._remount_filesystem' % DRV_PATH)
471 @mock.patch('%s.NefFilesystems.delete' % RPC_PATH)
472 @mock.patch('%s.NefSnapshots.clone' % RPC_PATH)
473 def test_create_share_from_snapshot_error(
474 self, snap_clone, fs_delete, remount_fs, mount_fs):
475 fs_delete.side_effect = jsonrpc.NefException('delete error')
476 mount_fs.side_effect = jsonrpc.NefException('create error')
477 self.assertRaises(
478 jsonrpc.NefException,
479 self.drv.create_share_from_snapshot, self.ctx, SHARE2, SNAPSHOT)
481 size = int(SHARE2['size'] * units.Gi * 1.1)
482 payload = {
483 'targetPath': SHARE2_PATH,
484 'referencedQuotaSize': size,
485 'recordSize': self.cfg.nexenta_dataset_record_size,
486 'compressionMode': self.cfg.nexenta_dataset_compression,
487 'nonBlockingMandatoryMode': False,
488 'referencedReservationSize': size
489 }
490 snap_clone.assert_called_once_with(SNAPSHOT['snapshot_path'], payload)
491 payload = {'force': True}
492 fs_delete.assert_called_once_with(SHARE2_PATH, payload)
494 @mock.patch('%s.NefFilesystems.rollback' % RPC_PATH)
495 def test_revert_to_snapshot(self, fs_rollback):
496 self.assertIsNone(self.drv.revert_to_snapshot(
497 self.ctx, SNAPSHOT, [], []))
498 payload = {'snapshot': 'snapshot-snap_id'}
499 fs_rollback.assert_called_once_with(
500 SHARE_PATH, payload)
502 @mock.patch('%s._set_reservation' % DRV_PATH)
503 @mock.patch('%s._set_quota' % DRV_PATH)
504 def test_extend_share(self, set_quota, set_reservation):
505 self.assertIsNone(self.drv.extend_share(
506 SHARE, 2))
507 set_quota.assert_called_once_with(
508 SHARE, 2)
509 set_reservation.assert_called_once_with(
510 SHARE, 2)
512 @mock.patch('%s.NefFilesystems.get' % RPC_PATH)
513 @mock.patch('%s._set_reservation' % DRV_PATH)
514 @mock.patch('%s._set_quota' % DRV_PATH)
515 def test_shrink_share(self, set_quota, set_reservation, fs_get):
516 fs_get.return_value = {
517 'bytesUsedBySelf': 0.5 * units.Gi
518 }
519 self.assertIsNone(self.drv.shrink_share(
520 SHARE2, 1))
521 set_quota.assert_called_once_with(
522 SHARE2, 1)
523 set_reservation.assert_called_once_with(
524 SHARE2, 1)
526 @mock.patch('%s.NefFilesystems.set' % RPC_PATH)
527 def test_set_quota(self, fs_set):
528 quota = int(2 * units.Gi * 1.1)
529 payload = {'referencedQuotaSize': quota}
530 self.assertIsNone(self.drv._set_quota(
531 SHARE, 2))
532 fs_set.assert_called_once_with(SHARE_PATH, payload)
534 @mock.patch('%s.NefFilesystems.set' % RPC_PATH)
535 def test_set_reservation(self, fs_set):
536 reservation = int(2 * units.Gi * 1.1)
537 payload = {'referencedReservationSize': reservation}
538 self.assertIsNone(self.drv._set_reservation(
539 SHARE, 2))
540 fs_set.assert_called_once_with(SHARE_PATH, payload)