Coverage for manila/tests/share/drivers/nexenta/ns5/test_nexenta_nas.py: 98%

310 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2019 Nexenta by DDN, Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16from unittest import mock 

17 

18import ddt 

19from oslo_utils import units 

20 

21from manila import context 

22from manila.share.drivers.nexenta.ns5 import jsonrpc 

23from manila.share.drivers.nexenta.ns5 import nexenta_nas 

24from manila import test 

25 

26RPC_PATH = 'manila.share.drivers.nexenta.ns5.jsonrpc' 

27DRV_PATH = 'manila.share.drivers.nexenta.ns5.nexenta_nas.NexentaNasDriver' 

28DRIVER_VERSION = '1.1' 

29SHARE = {'share_id': 'uuid', 'size': 1, 'share_proto': 'NFS'} 

30SHARE_PATH = 'pool1/nfs_share/share-uuid' 

31SHARE2 = {'share_id': 'uuid2', 'size': 2, 'share_proto': 'NFS'} 

32SHARE2_PATH = 'pool1/nfs_share/share-uuid2' 

33SNAPSHOT = { 

34 'snapshot_id': 'snap_id', 

35 'share': SHARE, 

36 'snapshot_path': '%s@%s' % (SHARE_PATH, 'snapshot-snap_id')} 

37 

38 

39@ddt.ddt 

40class TestNexentaNasDriver(test.TestCase): 

41 

42 def setUp(self): 

43 def _safe_get(opt): 

44 return getattr(self.cfg, opt) 

45 self.cfg = mock.Mock() 

46 self.mock_object( 

47 self.cfg, 'safe_get', mock.Mock(side_effect=_safe_get)) 

48 super(TestNexentaNasDriver, self).setUp() 

49 self.cfg.nexenta_nas_host = '1.1.1.1' 

50 self.cfg.nexenta_rest_addresses = ['2.2.2.2'] 

51 self.ctx = context.get_admin_context() 

52 self.cfg.nexenta_rest_port = 8080 

53 self.cfg.nexenta_rest_protocol = 'auto' 

54 self.cfg.nexenta_pool = 'pool1' 

55 self.cfg.nexenta_dataset_record_size = 131072 

56 self.cfg.reserved_share_percentage = 0 

57 self.cfg.reserved_share_from_snapshot_percentage = 0 

58 self.cfg.reserved_share_extend_percentage = 0 

59 self.cfg.nexenta_folder = 'nfs_share' 

60 self.cfg.nexenta_user = 'user' 

61 self.cfg.share_backend_name = 'NexentaStor5' 

62 self.cfg.nexenta_password = 'password' 

63 self.cfg.nexenta_thin_provisioning = False 

64 self.cfg.nexenta_mount_point_base = 'mnt' 

65 self.cfg.nexenta_rest_retry_count = 3 

66 self.cfg.nexenta_share_name_prefix = 'share-' 

67 self.cfg.max_over_subscription_ratio = 20.0 

68 self.cfg.enabled_share_protocols = 'NFS' 

69 self.cfg.nexenta_mount_point_base = '$state_path/mnt' 

70 self.cfg.nexenta_dataset_compression = 'on' 

71 self.cfg.network_config_group = 'DEFAULT' 

72 self.cfg.admin_network_config_group = ( 

73 'fake_admin_network_config_group') 

74 self.cfg.driver_handles_share_servers = False 

75 self.cfg.safe_get = self.fake_safe_get 

76 self.nef_mock = mock.Mock() 

77 self.mock_object(jsonrpc, 'NefRequest') 

78 self.drv = nexenta_nas.NexentaNasDriver(configuration=self.cfg) 

79 self.drv.do_setup(self.ctx) 

80 

81 def fake_safe_get(self, key): 

82 try: 

83 value = getattr(self.cfg, key) 

84 except AttributeError: 

85 value = None 

86 return value 

87 

88 def test_backend_name(self): 

89 self.assertEqual('NexentaStor5', self.drv.share_backend_name) 

90 

91 @mock.patch('%s._get_provisioned_capacity' % DRV_PATH) 

92 @mock.patch('manila.share.drivers.nexenta.ns5.' 

93 'jsonrpc.NefServices.get') 

94 @mock.patch('manila.share.drivers.nexenta.ns5.' 

95 'jsonrpc.NefFilesystems.set') 

96 @mock.patch('manila.share.drivers.nexenta.ns5.' 

97 'jsonrpc.NefFilesystems.get') 

98 def test_check_for_setup_error(self, get_filesystem, set_filesystem, 

99 get_service, prov_capacity): 

100 prov_capacity.return_value = 1 

101 get_filesystem.return_value = { 

102 'mountPoint': '/path/to/volume', 

103 'nonBlockingMandatoryMode': False, 

104 'smartCompression': False, 

105 'isMounted': True 

106 } 

107 get_service.return_value = { 

108 'state': 'online' 

109 } 

110 self.assertIsNone(self.drv.check_for_setup_error()) 

111 get_filesystem.assert_called_with(self.drv.root_path) 

112 set_filesystem.assert_not_called() 

113 get_service.assert_called_with('nfs') 

114 get_filesystem.return_value = { 

115 'mountPoint': '/path/to/volume', 

116 'nonBlockingMandatoryMode': True, 

117 'smartCompression': True, 

118 'isMounted': True 

119 } 

120 set_filesystem.return_value = {} 

121 payload = { 

122 'nonBlockingMandatoryMode': False, 

123 'smartCompression': False 

124 } 

125 self.assertIsNone(self.drv.check_for_setup_error()) 

126 get_filesystem.assert_called_with(self.drv.root_path) 

127 set_filesystem.assert_called_with(self.drv.root_path, payload) 

128 get_service.assert_called_with('nfs') 

129 get_filesystem.return_value = { 

130 'mountPoint': '/path/to/volume', 

131 'nonBlockingMandatoryMode': False, 

132 'smartCompression': True, 

133 'isMounted': True 

134 } 

135 payload = { 

136 'smartCompression': False 

137 } 

138 set_filesystem.return_value = {} 

139 self.assertIsNone(self.drv.check_for_setup_error()) 

140 get_filesystem.assert_called_with(self.drv.root_path) 

141 set_filesystem.assert_called_with(self.drv.root_path, payload) 

142 get_service.assert_called_with('nfs') 

143 get_filesystem.return_value = { 

144 'mountPoint': '/path/to/volume', 

145 'nonBlockingMandatoryMode': True, 

146 'smartCompression': False, 

147 'isMounted': True 

148 } 

149 payload = { 

150 'nonBlockingMandatoryMode': False 

151 } 

152 set_filesystem.return_value = {} 

153 self.assertIsNone(self.drv.check_for_setup_error()) 

154 get_filesystem.assert_called_with(self.drv.root_path) 

155 set_filesystem.assert_called_with(self.drv.root_path, payload) 

156 get_service.assert_called_with('nfs') 

157 get_filesystem.return_value = { 

158 'mountPoint': 'none', 

159 'nonBlockingMandatoryMode': False, 

160 'smartCompression': False, 

161 'isMounted': False 

162 } 

163 self.assertRaises(jsonrpc.NefException, 

164 self.drv.check_for_setup_error) 

165 get_filesystem.return_value = { 

166 'mountPoint': '/path/to/volume', 

167 'nonBlockingMandatoryMode': False, 

168 'smartCompression': False, 

169 'isMounted': False 

170 } 

171 self.assertRaises(jsonrpc.NefException, 

172 self.drv.check_for_setup_error) 

173 get_service.return_value = { 

174 'state': 'online' 

175 } 

176 self.assertRaises(jsonrpc.NefException, 

177 self.drv.check_for_setup_error) 

178 

179 @mock.patch('%s.NefFilesystems.get' % RPC_PATH) 

180 def test__get_provisioned_capacity(self, fs_get): 

181 fs_get.return_value = { 

182 'path': 'pool1/nfs_share/123', 

183 'referencedQuotaSize': 1 * units.Gi 

184 } 

185 

186 self.drv._get_provisioned_capacity() 

187 

188 self.assertEqual(1 * units.Gi, self.drv.provisioned_capacity) 

189 

190 @mock.patch('%s._mount_filesystem' % DRV_PATH) 

191 @mock.patch('%s.NefFilesystems.create' % RPC_PATH) 

192 @mock.patch('%s.NefFilesystems.delete' % RPC_PATH) 

193 def test_create_share(self, delete_fs, create_fs, mount_fs): 

194 mount_path = '%s:/%s' % (self.cfg.nexenta_nas_host, SHARE_PATH) 

195 mount_fs.return_value = mount_path 

196 size = int(1 * units.Gi * 1.1) 

197 self.assertEqual( 

198 [{ 

199 'path': mount_path, 

200 'id': 'share-uuid' 

201 }], 

202 self.drv.create_share(self.ctx, SHARE)) 

203 

204 payload = { 

205 'recordSize': 131072, 

206 'compressionMode': self.cfg.nexenta_dataset_compression, 

207 'path': SHARE_PATH, 

208 'referencedQuotaSize': size, 

209 'nonBlockingMandatoryMode': False, 

210 'referencedReservationSize': size 

211 } 

212 self.drv.nef.filesystems.create.assert_called_with(payload) 

213 

214 mount_fs.side_effect = jsonrpc.NefException('some error') 

215 self.assertRaises(jsonrpc.NefException, 

216 self.drv.create_share, self.ctx, SHARE) 

217 delete_payload = {'force': True} 

218 self.drv.nef.filesystems.delete.assert_called_with( 

219 SHARE_PATH, delete_payload) 

220 

221 @mock.patch('%s.NefFilesystems.promote' % RPC_PATH) 

222 @mock.patch('%s.NefSnapshots.get' % RPC_PATH) 

223 @mock.patch('%s.NefSnapshots.list' % RPC_PATH) 

224 @mock.patch('%s.NefFilesystems.delete' % RPC_PATH) 

225 def test_delete_share(self, fs_delete, snap_list, snap_get, fs_promote): 

226 delete_payload = {'force': True, 'snapshots': True} 

227 snapshots_payload = {'parent': SHARE_PATH, 'fields': 'path'} 

228 clones_payload = {'fields': 'clones,creationTxg'} 

229 clone_path = '%s:/%s' % (self.cfg.nexenta_nas_host, 'path_to_fs') 

230 fs_delete.side_effect = [ 

231 jsonrpc.NefException({ 

232 'message': 'some_error', 

233 'code': 'EEXIST'}), 

234 None] 

235 snap_list.return_value = [{'path': '%s@snap1' % SHARE_PATH}] 

236 snap_get.return_value = {'clones': [clone_path], 'creationTxg': 1} 

237 self.assertIsNone(self.drv.delete_share(self.ctx, SHARE)) 

238 fs_delete.assert_called_with(SHARE_PATH, delete_payload) 

239 fs_promote.assert_called_with(clone_path) 

240 snap_get.assert_called_with('%s@snap1' % SHARE_PATH, clones_payload) 

241 snap_list.assert_called_with(snapshots_payload) 

242 

243 @mock.patch('%s.NefFilesystems.mount' % RPC_PATH) 

244 @mock.patch('%s.NefFilesystems.get' % RPC_PATH) 

245 def test_mount_filesystem(self, fs_get, fs_mount): 

246 mount_path = '%s:/%s' % (self.cfg.nexenta_nas_host, SHARE_PATH) 

247 fs_get.return_value = { 

248 'mountPoint': '/%s' % SHARE_PATH, 'isMounted': False} 

249 self.assertEqual(mount_path, self.drv._mount_filesystem(SHARE)) 

250 self.drv.nef.filesystems.mount.assert_called_with(SHARE_PATH) 

251 

252 @mock.patch('%s.NefHpr.activate' % RPC_PATH) 

253 @mock.patch('%s.NefFilesystems.mount' % RPC_PATH) 

254 @mock.patch('%s.NefFilesystems.get' % RPC_PATH) 

255 def test_mount_filesystem_with_activate( 

256 self, fs_get, fs_mount, hpr_activate): 

257 mount_path = '%s:/%s' % (self.cfg.nexenta_nas_host, SHARE_PATH) 

258 fs_get.side_effect = [ 

259 {'mountPoint': 'none', 'isMounted': False}, 

260 {'mountPoint': '/%s' % SHARE_PATH, 'isMounted': False}] 

261 self.assertEqual(mount_path, self.drv._mount_filesystem(SHARE)) 

262 payload = {'datasetName': SHARE_PATH} 

263 self.drv.nef.hpr.activate.assert_called_once_with(payload) 

264 

265 @mock.patch('%s.NefFilesystems.mount' % RPC_PATH) 

266 @mock.patch('%s.NefFilesystems.unmount' % RPC_PATH) 

267 def test_remount_filesystem(self, fs_unmount, fs_mount): 

268 self.drv._remount_filesystem(SHARE_PATH) 

269 fs_unmount.assert_called_once_with(SHARE_PATH) 

270 fs_mount.assert_called_once_with(SHARE_PATH) 

271 

272 def parse_fqdn(self, fqdn): 

273 address_mask = fqdn.strip().split('/', 1) 

274 address = address_mask[0] 

275 ls = {"allow": True, "etype": "fqdn", "entity": address} 

276 if len(address_mask) == 2: 

277 ls['mask'] = address_mask[1] 

278 ls['etype'] = 'network' 

279 return ls 

280 

281 @ddt.data({'key': 'value'}, {}) 

282 @mock.patch('%s.NefNfs.list' % RPC_PATH) 

283 @mock.patch('%s.NefNfs.set' % RPC_PATH) 

284 @mock.patch('%s.NefFilesystems.acl' % RPC_PATH) 

285 def test_update_nfs_access(self, acl, nfs_set, nfs_list, list_data): 

286 security_contexts = {'securityModes': ['sys']} 

287 nfs_list.return_value = list_data 

288 rw_list = ['1.1.1.1/24', '2.2.2.2'] 

289 ro_list = ['3.3.3.3', '4.4.4.4/30'] 

290 security_contexts['readWriteList'] = [] 

291 security_contexts['readOnlyList'] = [] 

292 for fqdn in rw_list: 

293 ls = self.parse_fqdn(fqdn) 

294 if ls.get('mask'): 

295 ls['mask'] = int(ls['mask']) 

296 security_contexts['readWriteList'].append(ls) 

297 for fqdn in ro_list: 

298 ls = self.parse_fqdn(fqdn) 

299 if ls.get('mask'): 

300 ls['mask'] = int(ls['mask']) 

301 security_contexts['readOnlyList'].append(ls) 

302 

303 self.assertIsNone(self.drv._update_nfs_access(SHARE, rw_list, ro_list)) 

304 payload = { 

305 'flags': ['file_inherit', 'dir_inherit'], 

306 'permissions': ['full_set'], 

307 'principal': 'everyone@', 

308 'type': 'allow' 

309 } 

310 self.drv.nef.filesystems.acl.assert_called_with(SHARE_PATH, payload) 

311 payload = {'securityContexts': [security_contexts]} 

312 if list_data: 312 ↛ 315line 312 didn't jump to line 315 because the condition on line 312 was always true

313 self.drv.nef.nfs.set.assert_called_with(SHARE_PATH, payload) 

314 else: 

315 payload['filesystem'] = SHARE_PATH 

316 self.drv.nef.nfs.create.assert_called_with(payload) 

317 

318 def test_update_nfs_access_bad_mask(self): 

319 security_contexts = {'securityModes': ['sys']} 

320 rw_list = ['1.1.1.1/24', '2.2.2.2/1a'] 

321 ro_list = ['3.3.3.3', '4.4.4.4/30'] 

322 security_contexts['readWriteList'] = [] 

323 security_contexts['readOnlyList'] = [] 

324 for fqdn in rw_list: 

325 security_contexts['readWriteList'].append(self.parse_fqdn(fqdn)) 

326 for fqdn in ro_list: 

327 security_contexts['readOnlyList'].append(self.parse_fqdn(fqdn)) 

328 

329 self.assertRaises(ValueError, self.drv._update_nfs_access, 

330 SHARE, rw_list, ro_list) 

331 

332 @mock.patch('%s._update_nfs_access' % DRV_PATH) 

333 def test_update_access__ip_rw(self, update_nfs_access): 

334 access = { 

335 'access_type': 'ip', 

336 'access_to': '1.1.1.1', 

337 'access_level': 'rw', 

338 'access_id': 'fake_id' 

339 } 

340 

341 self.assertEqual( 

342 {'fake_id': {'state': 'active'}}, 

343 self.drv.update_access( 

344 self.ctx, SHARE, [access], None, None, None)) 

345 self.drv._update_nfs_access.assert_called_with(SHARE, ['1.1.1.1'], []) 

346 

347 @mock.patch('%s._update_nfs_access' % DRV_PATH) 

348 def test_update_access__ip_ro(self, update_nfs_access): 

349 access = { 

350 'access_type': 'ip', 

351 'access_to': '1.1.1.1', 

352 'access_level': 'ro', 

353 'access_id': 'fake_id' 

354 } 

355 

356 expected = {'fake_id': {'state': 'active'}} 

357 self.assertEqual( 

358 expected, self.drv.update_access( 

359 self.ctx, SHARE, [access], None, None, None)) 

360 self.drv._update_nfs_access.assert_called_with(SHARE, [], ['1.1.1.1']) 

361 

362 @ddt.data('rw', 'ro') 

363 def test_update_access__not_ip(self, access_level): 

364 access = { 

365 'access_type': 'username', 

366 'access_to': 'some_user', 

367 'access_level': access_level, 

368 'access_id': 'fake_id' 

369 } 

370 expected = {'fake_id': {'state': 'error'}} 

371 self.assertEqual(expected, self.drv.update_access( 

372 self.ctx, SHARE, [access], None, None, None)) 

373 

374 @mock.patch('%s._get_capacity_info' % DRV_PATH) 

375 @mock.patch('manila.share.driver.ShareDriver._update_share_stats') 

376 def test_update_share_stats(self, super_stats, info): 

377 info.return_value = (100, 90, 10) 

378 stats = { 

379 'vendor_name': 'Nexenta', 

380 'storage_protocol': 'NFS', 

381 'nfs_mount_point_base': self.cfg.nexenta_mount_point_base, 

382 'create_share_from_snapshot_support': True, 

383 'revert_to_snapshot_support': True, 

384 'snapshot_support': True, 

385 'driver_version': DRIVER_VERSION, 

386 'share_backend_name': self.cfg.share_backend_name, 

387 'pools': [{ 

388 'compression': True, 

389 'pool_name': 'pool1', 

390 'total_capacity_gb': 100, 

391 'free_capacity_gb': 90, 

392 'provisioned_capacity_gb': 0, 

393 'max_over_subscription_ratio': 20.0, 

394 'reserved_percentage': ( 

395 self.cfg.reserved_share_percentage), 

396 'reserved_snapshot_percentage': ( 

397 self.cfg.reserved_share_from_snapshot_percentage), 

398 'reserved_share_extend_percentage': ( 

399 self.cfg.reserved_share_extend_percentage), 

400 'thin_provisioning': self.cfg.nexenta_thin_provisioning, 

401 }], 

402 } 

403 

404 self.drv._update_share_stats() 

405 

406 self.assertEqual(stats, self.drv._stats) 

407 

408 def test_get_capacity_info(self): 

409 self.drv.nef.get.return_value = { 

410 'bytesAvailable': 9 * units.Gi, 'bytesUsed': 1 * units.Gi} 

411 

412 self.assertEqual((10, 9, 1), self.drv._get_capacity_info()) 

413 

414 @mock.patch('%s._set_reservation' % DRV_PATH) 

415 @mock.patch('%s._set_quota' % DRV_PATH) 

416 @mock.patch('%s.NefFilesystems.rename' % RPC_PATH) 

417 @mock.patch('%s.NefFilesystems.get' % RPC_PATH) 

418 def test_manage_existing(self, fs_get, fs_rename, set_res, set_quota): 

419 fs_get.return_value = {'referencedQuotaSize': 1073741824} 

420 old_path = '%s:/%s' % (self.cfg.nexenta_nas_host, 'path_to_fs') 

421 new_path = '%s:/%s' % (self.cfg.nexenta_nas_host, SHARE_PATH) 

422 SHARE['export_locations'] = [{'path': old_path}] 

423 expected = {'size': 2, 'export_locations': [{ 

424 'path': new_path 

425 }]} 

426 self.assertEqual(expected, self.drv.manage_existing(SHARE, None)) 

427 fs_rename.assert_called_with('path_to_fs', {'newPath': SHARE_PATH}) 

428 set_res.assert_called_with(SHARE, 2) 

429 set_quota.assert_called_with(SHARE, 2) 

430 

431 @mock.patch('%s.NefSnapshots.create' % RPC_PATH) 

432 def test_create_snapshot(self, snap_create): 

433 self.assertIsNone(self.drv.create_snapshot(self.ctx, SNAPSHOT)) 

434 snap_create.assert_called_once_with({ 

435 'path': SNAPSHOT['snapshot_path']}) 

436 

437 @mock.patch('%s.NefSnapshots.delete' % RPC_PATH) 

438 def test_delete_snapshot(self, snap_delete): 

439 self.assertIsNone(self.drv.delete_snapshot(self.ctx, SNAPSHOT)) 

440 payload = {'defer': True} 

441 snap_delete.assert_called_once_with( 

442 SNAPSHOT['snapshot_path'], payload) 

443 

444 @mock.patch('%s._mount_filesystem' % DRV_PATH) 

445 @mock.patch('%s._remount_filesystem' % DRV_PATH) 

446 @mock.patch('%s.NefFilesystems.delete' % RPC_PATH) 

447 @mock.patch('%s.NefSnapshots.clone' % RPC_PATH) 

448 def test_create_share_from_snapshot( 

449 self, snap_clone, fs_delete, remount_fs, mount_fs): 

450 mount_fs.return_value = 'mount_path' 

451 location = { 

452 'path': 'mount_path', 

453 'id': 'share-uuid2' 

454 } 

455 self.assertEqual([location], self.drv.create_share_from_snapshot( 

456 self.ctx, SHARE2, SNAPSHOT)) 

457 

458 size = int(SHARE2['size'] * units.Gi * 1.1) 

459 payload = { 

460 'targetPath': SHARE2_PATH, 

461 'referencedQuotaSize': size, 

462 'recordSize': self.cfg.nexenta_dataset_record_size, 

463 'compressionMode': self.cfg.nexenta_dataset_compression, 

464 'nonBlockingMandatoryMode': False, 

465 'referencedReservationSize': size 

466 } 

467 snap_clone.assert_called_once_with(SNAPSHOT['snapshot_path'], payload) 

468 

469 @mock.patch('%s._mount_filesystem' % DRV_PATH) 

470 @mock.patch('%s._remount_filesystem' % DRV_PATH) 

471 @mock.patch('%s.NefFilesystems.delete' % RPC_PATH) 

472 @mock.patch('%s.NefSnapshots.clone' % RPC_PATH) 

473 def test_create_share_from_snapshot_error( 

474 self, snap_clone, fs_delete, remount_fs, mount_fs): 

475 fs_delete.side_effect = jsonrpc.NefException('delete error') 

476 mount_fs.side_effect = jsonrpc.NefException('create error') 

477 self.assertRaises( 

478 jsonrpc.NefException, 

479 self.drv.create_share_from_snapshot, self.ctx, SHARE2, SNAPSHOT) 

480 

481 size = int(SHARE2['size'] * units.Gi * 1.1) 

482 payload = { 

483 'targetPath': SHARE2_PATH, 

484 'referencedQuotaSize': size, 

485 'recordSize': self.cfg.nexenta_dataset_record_size, 

486 'compressionMode': self.cfg.nexenta_dataset_compression, 

487 'nonBlockingMandatoryMode': False, 

488 'referencedReservationSize': size 

489 } 

490 snap_clone.assert_called_once_with(SNAPSHOT['snapshot_path'], payload) 

491 payload = {'force': True} 

492 fs_delete.assert_called_once_with(SHARE2_PATH, payload) 

493 

494 @mock.patch('%s.NefFilesystems.rollback' % RPC_PATH) 

495 def test_revert_to_snapshot(self, fs_rollback): 

496 self.assertIsNone(self.drv.revert_to_snapshot( 

497 self.ctx, SNAPSHOT, [], [])) 

498 payload = {'snapshot': 'snapshot-snap_id'} 

499 fs_rollback.assert_called_once_with( 

500 SHARE_PATH, payload) 

501 

502 @mock.patch('%s._set_reservation' % DRV_PATH) 

503 @mock.patch('%s._set_quota' % DRV_PATH) 

504 def test_extend_share(self, set_quota, set_reservation): 

505 self.assertIsNone(self.drv.extend_share( 

506 SHARE, 2)) 

507 set_quota.assert_called_once_with( 

508 SHARE, 2) 

509 set_reservation.assert_called_once_with( 

510 SHARE, 2) 

511 

512 @mock.patch('%s.NefFilesystems.get' % RPC_PATH) 

513 @mock.patch('%s._set_reservation' % DRV_PATH) 

514 @mock.patch('%s._set_quota' % DRV_PATH) 

515 def test_shrink_share(self, set_quota, set_reservation, fs_get): 

516 fs_get.return_value = { 

517 'bytesUsedBySelf': 0.5 * units.Gi 

518 } 

519 self.assertIsNone(self.drv.shrink_share( 

520 SHARE2, 1)) 

521 set_quota.assert_called_once_with( 

522 SHARE2, 1) 

523 set_reservation.assert_called_once_with( 

524 SHARE2, 1) 

525 

526 @mock.patch('%s.NefFilesystems.set' % RPC_PATH) 

527 def test_set_quota(self, fs_set): 

528 quota = int(2 * units.Gi * 1.1) 

529 payload = {'referencedQuotaSize': quota} 

530 self.assertIsNone(self.drv._set_quota( 

531 SHARE, 2)) 

532 fs_set.assert_called_once_with(SHARE_PATH, payload) 

533 

534 @mock.patch('%s.NefFilesystems.set' % RPC_PATH) 

535 def test_set_reservation(self, fs_set): 

536 reservation = int(2 * units.Gi * 1.1) 

537 payload = {'referencedReservationSize': reservation} 

538 self.assertIsNone(self.drv._set_reservation( 

539 SHARE, 2)) 

540 fs_set.assert_called_once_with(SHARE_PATH, payload)