Coverage for manila/tests/share/drivers/test_glusterfs.py: 100%

261 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2014 Red Hat, Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16import copy 

17import socket 

18from unittest import mock 

19 

20import ddt 

21from oslo_config import cfg 

22 

23from manila import context 

24from manila import exception 

25from manila.share import configuration as config 

26from manila.share.drivers import ganesha 

27from manila.share.drivers import glusterfs 

28from manila.share.drivers.glusterfs import layout 

29from manila import test 

30from manila.tests import fake_share 

31from manila.tests import fake_utils 

32 

33 

34CONF = cfg.CONF 

35 

36 

37fake_gluster_manager_attrs = { 

38 'export': '127.0.0.1:/testvol', 

39 'host': '127.0.0.1', 

40 'qualified': 'testuser@127.0.0.1:/testvol', 

41 'user': 'testuser', 

42 'volume': 'testvol', 

43 'path_to_private_key': '/fakepath/to/privatekey', 

44 'remote_server_password': 'fakepassword', 

45} 

46 

47fake_share_name = 'fakename' 

48NFS_EXPORT_DIR = 'nfs.export-dir' 

49NFS_EXPORT_VOL = 'nfs.export-volumes' 

50NFS_RPC_AUTH_ALLOW = 'nfs.rpc-auth-allow' 

51NFS_RPC_AUTH_REJECT = 'nfs.rpc-auth-reject' 

52 

53 

54@ddt.ddt 

55class GlusterfsShareDriverTestCase(test.TestCase): 

56 """Tests GlusterfsShareDriver.""" 

57 

58 def setUp(self): 

59 super(GlusterfsShareDriverTestCase, self).setUp() 

60 fake_utils.stub_out_utils_execute(self) 

61 self._execute = fake_utils.fake_execute 

62 self._context = context.get_admin_context() 

63 self.addCleanup(fake_utils.fake_execute_set_repliers, []) 

64 self.addCleanup(fake_utils.fake_execute_clear_log) 

65 

66 CONF.set_default('reserved_share_percentage', 50) 

67 CONF.set_default('reserved_share_from_snapshot_percentage', 30) 

68 CONF.set_default('reserved_share_extend_percentage', 30) 

69 CONF.set_default('driver_handles_share_servers', False) 

70 

71 self.fake_conf = config.Configuration(None) 

72 self._driver = glusterfs.GlusterfsShareDriver( 

73 execute=self._execute, 

74 configuration=self.fake_conf) 

75 self.share = fake_share.fake_share(share_proto='NFS') 

76 

77 def test_do_setup(self): 

78 self.mock_object(self._driver, '_get_helper') 

79 self.mock_object(layout.GlusterfsShareDriverBase, 'do_setup') 

80 _context = mock.Mock() 

81 

82 self._driver.do_setup(_context) 

83 

84 self._driver._get_helper.assert_called_once_with() 

85 layout.GlusterfsShareDriverBase.do_setup.assert_called_once_with( 

86 _context) 

87 

88 @ddt.data(True, False) 

89 def test_setup_via_manager(self, has_parent): 

90 gmgr = mock.Mock() 

91 share_mgr_parent = mock.Mock() if has_parent else None 

92 nfs_helper = mock.Mock() 

93 nfs_helper.get_export = mock.Mock(return_value='host:/vol') 

94 self._driver.nfs_helper = mock.Mock(return_value=nfs_helper) 

95 

96 ret = self._driver._setup_via_manager( 

97 {'manager': gmgr, 'share': self.share}, 

98 share_manager_parent=share_mgr_parent) 

99 

100 gmgr.set_vol_option.assert_called_once_with( 

101 'nfs.export-volumes', False) 

102 self._driver.nfs_helper.assert_called_once_with( 

103 self._execute, self.fake_conf, gluster_manager=gmgr) 

104 nfs_helper.get_export.assert_called_once_with(self.share) 

105 self.assertEqual('host:/vol', ret) 

106 

107 @ddt.data({'helpercls': None, 'path': '/fakepath'}, 

108 {'helpercls': None, 'path': None}, 

109 {'helpercls': glusterfs.GlusterNFSHelper, 'path': '/fakepath'}, 

110 {'helpercls': glusterfs.GlusterNFSHelper, 'path': None}) 

111 @ddt.unpack 

112 def test_setup_via_manager_path(self, helpercls, path): 

113 gmgr = mock.Mock() 

114 gmgr.path = path 

115 if not helpercls: 

116 helper = mock.Mock() 

117 helper.get_export = mock.Mock(return_value='host:/vol') 

118 helpercls = mock.Mock(return_value=helper) 

119 self._driver.nfs_helper = helpercls 

120 if helpercls == glusterfs.GlusterNFSHelper and path is None: 

121 gmgr.get_vol_option = mock.Mock(return_value=True) 

122 

123 self._driver._setup_via_manager( 

124 {'manager': gmgr, 'share': self.share}) 

125 

126 if helpercls == glusterfs.GlusterNFSHelper and path is None: 

127 gmgr.get_vol_option.assert_called_once_with( 

128 NFS_EXPORT_VOL, boolean=True) 

129 args = (NFS_RPC_AUTH_REJECT, '*') 

130 else: 

131 args = (NFS_EXPORT_VOL, False) 

132 gmgr.set_vol_option.assert_called_once_with(*args) 

133 

134 def test_setup_via_manager_export_volumes_off(self): 

135 gmgr = mock.Mock() 

136 gmgr.path = None 

137 gmgr.get_vol_option = mock.Mock(return_value=False) 

138 self._driver.nfs_helper = glusterfs.GlusterNFSHelper 

139 

140 self.assertRaises(exception.GlusterfsException, 

141 self._driver._setup_via_manager, 

142 {'manager': gmgr, 'share': self.share}) 

143 

144 gmgr.get_vol_option.assert_called_once_with(NFS_EXPORT_VOL, 

145 boolean=True) 

146 

147 def test_check_for_setup_error(self): 

148 self._driver.check_for_setup_error() 

149 

150 def test_update_share_stats(self): 

151 self.mock_object(layout.GlusterfsShareDriverBase, 

152 '_update_share_stats') 

153 

154 self._driver._update_share_stats() 

155 

156 (layout.GlusterfsShareDriverBase._update_share_stats. 

157 assert_called_once_with({'storage_protocol': 'NFS', 

158 'vendor_name': 'Red Hat', 

159 'share_backend_name': 'GlusterFS', 

160 'reserved_percentage': 50, 

161 'reserved_snapshot_percentage': 30, 

162 'reserved_share_extend_percentage': 30})) 

163 

164 def test_get_network_allocations_number(self): 

165 self.assertEqual(0, self._driver.get_network_allocations_number()) 

166 

167 def test_get_helper(self): 

168 ret = self._driver._get_helper() 

169 self.assertIsInstance(ret, self._driver.nfs_helper) 

170 

171 @ddt.data({'path': '/fakepath', 'helper': glusterfs.GlusterNFSHelper}, 

172 {'path': None, 'helper': glusterfs.GlusterNFSVolHelper}) 

173 @ddt.unpack 

174 def test_get_helper_vol(self, path, helper): 

175 self._driver.nfs_helper = glusterfs.GlusterNFSHelper 

176 

177 gmgr = mock.Mock(path=path) 

178 ret = self._driver._get_helper(gmgr) 

179 

180 self.assertIsInstance(ret, helper) 

181 

182 @ddt.data('type', 'level') 

183 def test_supported_access_features(self, feature): 

184 nfs_helper = mock.Mock() 

185 supported_access_feature = mock.Mock() 

186 setattr(nfs_helper, 'supported_access_%ss' % feature, 

187 supported_access_feature) 

188 self.mock_object(self._driver, 'nfs_helper', nfs_helper) 

189 

190 ret = getattr(self._driver, 'supported_access_%ss' % feature) 

191 

192 self.assertEqual(supported_access_feature, ret) 

193 

194 def test_update_access_via_manager(self): 

195 self.mock_object(self._driver, '_get_helper') 

196 gmgr = mock.Mock() 

197 add_rules = mock.Mock() 

198 delete_rules = mock.Mock() 

199 

200 self._driver._update_access_via_manager( 

201 gmgr, self._context, self.share, 

202 add_rules, delete_rules, recovery=True) 

203 

204 self._driver._get_helper.assert_called_once_with(gmgr) 

205 self._driver._get_helper().update_access.assert_called_once_with( 

206 '/', self.share, add_rules, delete_rules, recovery=True) 

207 

208 

209@ddt.ddt 

210class GlusterNFSHelperTestCase(test.TestCase): 

211 """Tests GlusterNFSHelper.""" 

212 

213 def setUp(self): 

214 super(GlusterNFSHelperTestCase, self).setUp() 

215 fake_utils.stub_out_utils_execute(self) 

216 gluster_manager = mock.Mock(**fake_gluster_manager_attrs) 

217 self._execute = mock.Mock(return_value=('', '')) 

218 self.fake_conf = config.Configuration(None) 

219 self._helper = glusterfs.GlusterNFSHelper( 

220 self._execute, self.fake_conf, gluster_manager=gluster_manager) 

221 

222 def test_get_export(self): 

223 ret = self._helper.get_export(mock.Mock()) 

224 

225 self.assertEqual(fake_gluster_manager_attrs['export'], ret) 

226 

227 @ddt.data({'output_str': '/foo(10.0.0.1|10.0.0.2),/bar(10.0.0.1)', 

228 'expected': {'foo': ['10.0.0.1', '10.0.0.2'], 

229 'bar': ['10.0.0.1']}}, 

230 {'output_str': None, 'expected': {}}) 

231 @ddt.unpack 

232 def test_get_export_dir_dict(self, output_str, expected): 

233 self.mock_object(self._helper.gluster_manager, 

234 'get_vol_option', 

235 mock.Mock(return_value=output_str)) 

236 

237 ret = self._helper._get_export_dir_dict() 

238 

239 self.assertEqual(expected, ret) 

240 (self._helper.gluster_manager.get_vol_option. 

241 assert_called_once_with(NFS_EXPORT_DIR)) 

242 

243 @ddt.data({'delta': (['10.0.0.2'], []), 'extra_exports': {}, 

244 'new_exports': '/fakename(10.0.0.1|10.0.0.2)'}, 

245 {'delta': (['10.0.0.1'], []), 'extra_exports': {}, 

246 'new_exports': '/fakename(10.0.0.1)'}, 

247 {'delta': ([], ['10.0.0.2']), 'extra_exports': {}, 

248 'new_exports': '/fakename(10.0.0.1)'}, 

249 {'delta': ([], ['10.0.0.1']), 'extra_exports': {}, 

250 'new_exports': None}, 

251 {'delta': ([], ['10.0.0.1']), 

252 'extra_exports': {'elsewhere': ['10.0.1.3']}, 

253 'new_exports': '/elsewhere(10.0.1.3)'}) 

254 @ddt.unpack 

255 def test_update_access(self, delta, extra_exports, new_exports): 

256 gluster_manager_attrs = {'path': '/fakename'} 

257 gluster_manager_attrs.update(fake_gluster_manager_attrs) 

258 gluster_mgr = mock.Mock(**gluster_manager_attrs) 

259 helper = glusterfs.GlusterNFSHelper( 

260 self._execute, self.fake_conf, gluster_manager=gluster_mgr) 

261 export_dir_dict = {'fakename': ['10.0.0.1']} 

262 export_dir_dict.update(extra_exports) 

263 helper._get_export_dir_dict = mock.Mock(return_value=export_dir_dict) 

264 _share = mock.Mock() 

265 

266 add_rules, delete_rules = ( 

267 map(lambda a: {'access_to': a}, r) for r in delta) 

268 helper.update_access('/', _share, add_rules, delete_rules) 

269 

270 helper._get_export_dir_dict.assert_called_once_with() 

271 gluster_mgr.set_vol_option.assert_called_once_with(NFS_EXPORT_DIR, 

272 new_exports) 

273 

274 @ddt.data({}, {'elsewhere': '10.0.1.3'}) 

275 def test_update_access_disjoint(self, export_dir_dict): 

276 gluster_manager_attrs = {'path': '/fakename'} 

277 gluster_manager_attrs.update(fake_gluster_manager_attrs) 

278 gluster_mgr = mock.Mock(**gluster_manager_attrs) 

279 helper = glusterfs.GlusterNFSHelper( 

280 self._execute, self.fake_conf, gluster_manager=gluster_mgr) 

281 helper._get_export_dir_dict = mock.Mock(return_value=export_dir_dict) 

282 _share = mock.Mock() 

283 

284 helper.update_access('/', _share, [], [{'access_to': '10.0.0.2'}]) 

285 

286 helper._get_export_dir_dict.assert_called_once_with() 

287 self.assertFalse(gluster_mgr.set_vol_option.called) 

288 

289 

290@ddt.ddt 

291class GlusterNFSVolHelperTestCase(test.TestCase): 

292 """Tests GlusterNFSVolHelper.""" 

293 

294 def setUp(self): 

295 super(GlusterNFSVolHelperTestCase, self).setUp() 

296 fake_utils.stub_out_utils_execute(self) 

297 gluster_manager = mock.Mock(**fake_gluster_manager_attrs) 

298 self._execute = mock.Mock(return_value=('', '')) 

299 self.fake_conf = config.Configuration(None) 

300 self._helper = glusterfs.GlusterNFSVolHelper( 

301 self._execute, self.fake_conf, gluster_manager=gluster_manager) 

302 

303 @ddt.data({'output_str': '10.0.0.1,10.0.0.2', 

304 'expected': ['10.0.0.1', '10.0.0.2']}, 

305 {'output_str': None, 'expected': []}) 

306 @ddt.unpack 

307 def test_get_vol_exports(self, output_str, expected): 

308 self.mock_object(self._helper.gluster_manager, 

309 'get_vol_option', 

310 mock.Mock(return_value=output_str)) 

311 

312 ret = self._helper._get_vol_exports() 

313 

314 self.assertEqual(expected, ret) 

315 (self._helper.gluster_manager.get_vol_option. 

316 assert_called_once_with(NFS_RPC_AUTH_ALLOW)) 

317 

318 @ddt.data({'delta': (["10.0.0.1"], []), 'expected': "10.0.0.1,10.0.0.3"}, 

319 {'delta': (["10.0.0.2"], []), 

320 'expected': "10.0.0.1,10.0.0.2,10.0.0.3"}, 

321 {'delta': ([], ["10.0.0.1"]), 'expected': "10.0.0.3"}, 

322 {'delta': ([], ["10.0.0.2"]), 'expected': "10.0.0.1,10.0.0.3"}) 

323 @ddt.unpack 

324 def test_update_access(self, delta, expected): 

325 self.mock_object(self._helper, '_get_vol_exports', mock.Mock( 

326 return_value=["10.0.0.1", "10.0.0.3"])) 

327 _share = mock.Mock() 

328 

329 add_rules, delete_rules = ( 

330 map(lambda a: {'access_to': a}, r) for r in delta) 

331 self._helper.update_access("/", _share, add_rules, delete_rules) 

332 

333 self._helper._get_vol_exports.assert_called_once_with() 

334 argseq = [(NFS_RPC_AUTH_ALLOW, expected), (NFS_RPC_AUTH_REJECT, None)] 

335 self.assertEqual( 

336 [mock.call(*a) for a in argseq], 

337 self._helper.gluster_manager.set_vol_option.call_args_list) 

338 

339 def test_update_access_empty(self): 

340 self.mock_object(self._helper, '_get_vol_exports', mock.Mock( 

341 return_value=["10.0.0.1"])) 

342 _share = mock.Mock() 

343 

344 self._helper.update_access("/", _share, [], 

345 [{'access_to': "10.0.0.1"}]) 

346 

347 self._helper._get_vol_exports.assert_called_once_with() 

348 argseq = [(NFS_RPC_AUTH_ALLOW, None), (NFS_RPC_AUTH_REJECT, "*")] 

349 self.assertEqual( 

350 [mock.call(*a) for a in argseq], 

351 self._helper.gluster_manager.set_vol_option.call_args_list) 

352 

353 

354class GaneshaNFSHelperTestCase(test.TestCase): 

355 """Tests GaneshaNFSHelper.""" 

356 

357 def setUp(self): 

358 super(GaneshaNFSHelperTestCase, self).setUp() 

359 self.gluster_manager = mock.Mock(**fake_gluster_manager_attrs) 

360 self._execute = mock.Mock(return_value=('', '')) 

361 self._root_execute = mock.Mock(return_value=('', '')) 

362 self.access = fake_share.fake_access() 

363 self.fake_conf = config.Configuration(None) 

364 self.fake_template = {'key': 'value'} 

365 self.share = fake_share.fake_share() 

366 self.mock_object(glusterfs.ganesha_utils, 'RootExecutor', 

367 mock.Mock(return_value=self._root_execute)) 

368 self.mock_object(glusterfs.ganesha.GaneshaNASHelper, '__init__', 

369 mock.Mock()) 

370 socket.gethostname = mock.Mock(return_value='example.com') 

371 self._helper = glusterfs.GaneshaNFSHelper( 

372 self._execute, self.fake_conf, 

373 gluster_manager=self.gluster_manager) 

374 self._helper.tag = 'GLUSTER-Ganesha-localhost' 

375 

376 def test_init_local_ganesha_server(self): 

377 glusterfs.ganesha_utils.RootExecutor.assert_called_once_with( 

378 self._execute) 

379 socket.gethostname.assert_has_calls([mock.call()]) 

380 glusterfs.ganesha.GaneshaNASHelper.__init__.assert_has_calls( 

381 [mock.call(self._root_execute, self.fake_conf, 

382 tag='GLUSTER-Ganesha-example.com')]) 

383 

384 def test_get_export(self): 

385 ret = self._helper.get_export(self.share) 

386 

387 self.assertEqual('example.com:/fakename--<access-id>', ret) 

388 

389 def test_init_remote_ganesha_server(self): 

390 ssh_execute = mock.Mock(return_value=('', '')) 

391 CONF.set_default('glusterfs_ganesha_server_ip', 'fakeip') 

392 self.mock_object(glusterfs.ganesha_utils, 'SSHExecutor', 

393 mock.Mock(return_value=ssh_execute)) 

394 glusterfs.GaneshaNFSHelper( 

395 self._execute, self.fake_conf, 

396 gluster_manager=self.gluster_manager) 

397 glusterfs.ganesha_utils.SSHExecutor.assert_called_once_with( 

398 'fakeip', 22, None, 'root', password=None, privatekey=None) 

399 glusterfs.ganesha.GaneshaNASHelper.__init__.assert_has_calls( 

400 [mock.call(ssh_execute, self.fake_conf, 

401 tag='GLUSTER-Ganesha-fakeip')]) 

402 

403 def test_init_helper(self): 

404 ganeshelper = mock.Mock() 

405 exptemp = mock.Mock() 

406 

407 def set_attributes(*a, **kw): 

408 self._helper.ganesha = ganeshelper 

409 self._helper.export_template = exptemp 

410 

411 self.mock_object(ganesha.GaneshaNASHelper, 'init_helper', 

412 mock.Mock(side_effect=set_attributes)) 

413 self.assertEqual({}, glusterfs.GaneshaNFSHelper.shared_data) 

414 

415 self._helper.init_helper() 

416 

417 ganesha.GaneshaNASHelper.init_helper.assert_called_once_with() 

418 self.assertEqual(ganeshelper, self._helper.ganesha) 

419 self.assertEqual(exptemp, self._helper.export_template) 

420 self.assertEqual({ 

421 'GLUSTER-Ganesha-localhost': { 

422 'ganesha': ganeshelper, 

423 'export_template': exptemp}}, 

424 glusterfs.GaneshaNFSHelper.shared_data) 

425 

426 other_helper = glusterfs.GaneshaNFSHelper( 

427 self._execute, self.fake_conf, 

428 gluster_manager=self.gluster_manager) 

429 other_helper.tag = 'GLUSTER-Ganesha-localhost' 

430 

431 other_helper.init_helper() 

432 

433 self.assertEqual(ganeshelper, other_helper.ganesha) 

434 self.assertEqual(exptemp, other_helper.export_template) 

435 

436 def test_default_config_hook(self): 

437 fake_conf_dict = {'key': 'value1'} 

438 mock_ganesha_utils_patch = mock.Mock() 

439 

440 def fake_patch_run(tmpl1, tmpl2): 

441 mock_ganesha_utils_patch( 

442 copy.deepcopy(tmpl1), tmpl2) 

443 tmpl1.update(tmpl2) 

444 

445 self.mock_object(glusterfs.ganesha.GaneshaNASHelper, 

446 '_default_config_hook', 

447 mock.Mock(return_value=self.fake_template)) 

448 self.mock_object(glusterfs.ganesha_utils, 'path_from', 

449 mock.Mock(return_value='/fakedir/glusterfs/conf')) 

450 self.mock_object(self._helper, '_load_conf_dir', 

451 mock.Mock(return_value=fake_conf_dict)) 

452 self.mock_object(glusterfs.ganesha_utils, 'patch', 

453 mock.Mock(side_effect=fake_patch_run)) 

454 

455 ret = self._helper._default_config_hook() 

456 

457 (glusterfs.ganesha.GaneshaNASHelper._default_config_hook. 

458 assert_called_once_with()) 

459 glusterfs.ganesha_utils.path_from.assert_called_once_with( 

460 glusterfs.__file__, 'conf') 

461 self._helper._load_conf_dir.assert_called_once_with( 

462 '/fakedir/glusterfs/conf') 

463 glusterfs.ganesha_utils.patch.assert_called_once_with( 

464 self.fake_template, fake_conf_dict) 

465 self.assertEqual(fake_conf_dict, ret) 

466 

467 def test_fsal_hook(self): 

468 self._helper.gluster_manager.path = '/fakename' 

469 output = { 

470 'Hostname': '127.0.0.1', 

471 'Volume': 'testvol', 

472 'Volpath': '/fakename' 

473 } 

474 

475 ret = self._helper._fsal_hook('/fakepath', self.share, self.access) 

476 

477 self.assertEqual(output, ret)