Coverage for manila/tests/share/drivers/hdfs/test_hdfs_native.py: 100%

258 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2015 Intel, Corp. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); you may 

4# not use this file except in compliance with the License. You may obtain 

5# a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

12# License for the specific language governing permissions and limitations 

13# under the License. 

14 

15"""Unit tests for HDFS native protocol driver module.""" 

16 

17import socket 

18from unittest import mock 

19 

20from oslo_concurrency import processutils 

21from oslo_config import cfg 

22 

23from manila import context 

24from manila import exception 

25import manila.share.configuration as config 

26import manila.share.drivers.hdfs.hdfs_native as hdfs_native 

27from manila import ssh_utils 

28from manila import test 

29from manila.tests import fake_share 

30from manila import utils 

31 

32 

33CONF = cfg.CONF 

34 

35 

36class HDFSNativeShareDriverTestCase(test.TestCase): 

37 """Tests HDFSNativeShareDriver.""" 

38 

39 def setUp(self): 

40 super(HDFSNativeShareDriverTestCase, self).setUp() 

41 self._context = context.get_admin_context() 

42 self._hdfs_execute = mock.Mock(return_value=('', '')) 

43 self.local_ip = '192.168.1.1' 

44 

45 CONF.set_default('driver_handles_share_servers', False) 

46 CONF.set_default('hdfs_namenode_ip', self.local_ip) 

47 CONF.set_default('hdfs_ssh_name', 'fake_sshname') 

48 CONF.set_default('hdfs_ssh_pw', 'fake_sshpw') 

49 CONF.set_default('hdfs_ssh_private_key', 'fake_sshkey') 

50 

51 self.fake_conf = config.Configuration(None) 

52 self._driver = hdfs_native.HDFSNativeShareDriver( 

53 execute=self._hdfs_execute, 

54 configuration=self.fake_conf) 

55 self.hdfs_bin = 'hdfs' 

56 self._driver._hdfs_bin = 'fake_hdfs_bin' 

57 self.share = fake_share.fake_share(share_proto='HDFS') 

58 self.snapshot = fake_share.fake_snapshot(share_proto='HDFS') 

59 self.access = fake_share.fake_access(access_type='user') 

60 self.fakesharepath = 'hdfs://1.2.3.4:5/share-0' 

61 self.fakesnapshotpath = '/share-0/.snapshot/snapshot-0' 

62 

63 socket.gethostname = mock.Mock(return_value='testserver') 

64 socket.gethostbyname_ex = mock.Mock(return_value=( 

65 'localhost', 

66 ['localhost.localdomain', 'testserver'], 

67 ['127.0.0.1', self.local_ip])) 

68 

69 def test_do_setup(self): 

70 self._driver.do_setup(self._context) 

71 self.assertEqual(self._driver._hdfs_bin, self.hdfs_bin) 

72 

73 def test_create_share(self): 

74 self._driver._create_share = mock.Mock() 

75 self._driver._get_share_path = mock.Mock( 

76 return_value=self.fakesharepath) 

77 result = self._driver.create_share(self._context, self.share, 

78 share_server=None) 

79 self._driver._create_share.assert_called_once_with(self.share) 

80 self._driver._get_share_path.assert_called_once_with(self.share) 

81 

82 self.assertEqual(self.fakesharepath, result) 

83 

84 def test_create_share_unsupported_proto(self): 

85 self._driver._get_share_path = mock.Mock() 

86 self.assertRaises(exception.HDFSException, 

87 self._driver.create_share, 

88 self._context, 

89 fake_share.fake_share(), 

90 share_server=None) 

91 self.assertFalse(self._driver._get_share_path.called) 

92 

93 def test__set_share_size(self): 

94 share_dir = '/' + self.share['name'] 

95 sizestr = str(self.share['size']) + 'g' 

96 self._driver._hdfs_execute = mock.Mock(return_value=True) 

97 self._driver._set_share_size(self.share) 

98 self._driver._hdfs_execute.assert_called_once_with( 

99 'fake_hdfs_bin', 'dfsadmin', '-setSpaceQuota', sizestr, share_dir) 

100 

101 def test__set_share_size_exception(self): 

102 share_dir = '/' + self.share['name'] 

103 sizestr = str(self.share['size']) + 'g' 

104 self._driver._hdfs_execute = mock.Mock( 

105 side_effect=exception.ProcessExecutionError) 

106 self.assertRaises(exception.HDFSException, 

107 self._driver._set_share_size, self.share) 

108 self._driver._hdfs_execute.assert_called_once_with( 

109 'fake_hdfs_bin', 'dfsadmin', '-setSpaceQuota', sizestr, share_dir) 

110 

111 def test__set_share_size_with_new_size(self): 

112 share_dir = '/' + self.share['name'] 

113 new_size = 'fake_size' 

114 sizestr = new_size + 'g' 

115 self._driver._hdfs_execute = mock.Mock(return_value=True) 

116 self._driver._set_share_size(self.share, new_size) 

117 self._driver._hdfs_execute.assert_called_once_with( 

118 'fake_hdfs_bin', 'dfsadmin', '-setSpaceQuota', sizestr, share_dir) 

119 

120 def test__create_share(self): 

121 share_dir = '/' + self.share['name'] 

122 self._driver._hdfs_execute = mock.Mock(return_value=True) 

123 self._driver._set_share_size = mock.Mock() 

124 self._driver._create_share(self.share) 

125 self._driver._hdfs_execute.assert_any_call( 

126 'fake_hdfs_bin', 'dfs', '-mkdir', share_dir) 

127 self._driver._set_share_size.assert_called_once_with(self.share) 

128 self._driver._hdfs_execute.assert_any_call( 

129 'fake_hdfs_bin', 'dfsadmin', '-allowSnapshot', share_dir) 

130 

131 def test__create_share_exception(self): 

132 share_dir = '/' + self.share['name'] 

133 self._driver._hdfs_execute = mock.Mock( 

134 side_effect=exception.ProcessExecutionError) 

135 self.assertRaises(exception.HDFSException, 

136 self._driver._create_share, self.share) 

137 self._driver._hdfs_execute.assert_called_once_with( 

138 'fake_hdfs_bin', 'dfs', '-mkdir', share_dir) 

139 

140 def test_create_share_from_empty_snapshot(self): 

141 return_hdfs_execute = (None, None) 

142 self._driver._hdfs_execute = mock.Mock( 

143 return_value=return_hdfs_execute) 

144 self._driver._create_share = mock.Mock(return_value=True) 

145 self._driver._get_share_path = mock.Mock(return_value=self. 

146 fakesharepath) 

147 self._driver._get_snapshot_path = mock.Mock(return_value=self. 

148 fakesnapshotpath) 

149 result = self._driver.create_share_from_snapshot(self._context, 

150 self.share, 

151 self.snapshot, 

152 share_server=None) 

153 self._driver._create_share.assert_called_once_with(self.share) 

154 self._driver._get_snapshot_path.assert_called_once_with( 

155 self.snapshot) 

156 self._driver._hdfs_execute.assert_called_once_with( 

157 'fake_hdfs_bin', 'dfs', '-ls', self.fakesnapshotpath) 

158 self._driver._get_share_path.assert_called_once_with(self.share) 

159 self.assertEqual(self.fakesharepath, result) 

160 

161 def test_create_share_from_snapshot(self): 

162 return_hdfs_execute = ("fake_content", None) 

163 self._driver._hdfs_execute = mock.Mock( 

164 return_value=return_hdfs_execute) 

165 self._driver._create_share = mock.Mock(return_value=True) 

166 self._driver._get_share_path = mock.Mock(return_value=self. 

167 fakesharepath) 

168 self._driver._get_snapshot_path = mock.Mock(return_value=self. 

169 fakesnapshotpath) 

170 result = self._driver.create_share_from_snapshot(self._context, 

171 self.share, 

172 self.snapshot, 

173 share_server=None) 

174 self._driver._create_share.assert_called_once_with(self.share) 

175 self._driver._get_snapshot_path.assert_called_once_with( 

176 self.snapshot) 

177 

178 calls = [mock.call('fake_hdfs_bin', 'dfs', 

179 '-ls', self.fakesnapshotpath), 

180 mock.call('fake_hdfs_bin', 'dfs', '-cp', 

181 self.fakesnapshotpath + '/*', 

182 '/' + self.share['name'])] 

183 

184 self._driver._hdfs_execute.assert_has_calls(calls) 

185 self._driver._get_share_path.assert_called_once_with(self.share) 

186 self.assertEqual(self.fakesharepath, result) 

187 

188 def test_create_share_from_snapshot_exception(self): 

189 self._driver._create_share = mock.Mock(return_value=True) 

190 self._driver._get_snapshot_path = mock.Mock(return_value=self. 

191 fakesnapshotpath) 

192 self._driver._get_share_path = mock.Mock(return_value=self. 

193 fakesharepath) 

194 self._driver._hdfs_execute = mock.Mock( 

195 side_effect=exception.ProcessExecutionError) 

196 self.assertRaises(exception.HDFSException, 

197 self._driver.create_share_from_snapshot, 

198 self._context, self.share, 

199 self.snapshot, share_server=None) 

200 self._driver._create_share.assert_called_once_with(self.share) 

201 self._driver._get_snapshot_path.assert_called_once_with(self.snapshot) 

202 

203 self._driver._hdfs_execute.assert_called_once_with( 

204 'fake_hdfs_bin', 'dfs', '-ls', self.fakesnapshotpath) 

205 self.assertFalse(self._driver._get_share_path.called) 

206 

207 def test_create_snapshot(self): 

208 self._driver._hdfs_execute = mock.Mock(return_value=True) 

209 self._driver.create_snapshot(self._context, self.snapshot, 

210 share_server=None) 

211 self._driver._hdfs_execute.assert_called_once_with( 

212 'fake_hdfs_bin', 'dfs', '-createSnapshot', 

213 '/' + self.snapshot['share_name'], self.snapshot['name']) 

214 

215 def test_create_snapshot_exception(self): 

216 self._driver._hdfs_execute = mock.Mock( 

217 side_effect=exception.ProcessExecutionError) 

218 self.assertRaises(exception.HDFSException, 

219 self._driver.create_snapshot, self._context, 

220 self.snapshot, share_server=None) 

221 self._driver._hdfs_execute.assert_called_once_with( 

222 'fake_hdfs_bin', 'dfs', '-createSnapshot', 

223 '/' + self.snapshot['share_name'], self.snapshot['name']) 

224 

225 def test_delete_share(self): 

226 self._driver._hdfs_execute = mock.Mock(return_value=True) 

227 self._driver.delete_share(self._context, 

228 self.share, 

229 share_server=None) 

230 self._driver._hdfs_execute.assert_called_once_with( 

231 'fake_hdfs_bin', 'dfs', '-rm', '-r', 

232 '/' + self.share['name']) 

233 

234 def test_delete_share_exception(self): 

235 self._driver._hdfs_execute = mock.Mock( 

236 side_effect=exception.ProcessExecutionError) 

237 self.assertRaises(exception.HDFSException, 

238 self._driver.delete_share, 

239 self._context, 

240 self.share, 

241 share_server=None) 

242 self._driver._hdfs_execute.assert_called_once_with( 

243 'fake_hdfs_bin', 'dfs', '-rm', '-r', 

244 '/' + self.share['name']) 

245 

246 def test_delete_snapshot(self): 

247 self._driver._hdfs_execute = mock.Mock(return_value=True) 

248 self._driver.delete_snapshot(self._context, 

249 self.snapshot, 

250 share_server=None) 

251 self._driver._hdfs_execute.assert_called_once_with( 

252 'fake_hdfs_bin', 'dfs', '-deleteSnapshot', 

253 '/' + self.snapshot['share_name'], self.snapshot['name']) 

254 

255 def test_delete_snapshot_exception(self): 

256 self._driver._hdfs_execute = mock.Mock( 

257 side_effect=exception.ProcessExecutionError) 

258 self.assertRaises(exception.HDFSException, 

259 self._driver.delete_snapshot, 

260 self._context, 

261 self.snapshot, 

262 share_server=None) 

263 self._driver._hdfs_execute.assert_called_once_with( 

264 'fake_hdfs_bin', 'dfs', '-deleteSnapshot', 

265 '/' + self.snapshot['share_name'], self.snapshot['name']) 

266 

267 def test_allow_access(self): 

268 self._driver._hdfs_execute = mock.Mock( 

269 return_value=['', '']) 

270 share_dir = '/' + self.share['name'] 

271 user_access = ':'.join([self.access['access_type'], 

272 self.access['access_to'], 

273 'rwx']) 

274 cmd = ['fake_hdfs_bin', 'dfs', '-setfacl', '-m', '-R', 

275 user_access, share_dir] 

276 self._driver.allow_access(self._context, self.share, self.access, 

277 share_server=None) 

278 self._driver._hdfs_execute.assert_called_once_with( 

279 *cmd, check_exit_code=True) 

280 

281 def test_allow_access_invalid_access_type(self): 

282 self.assertRaises(exception.InvalidShareAccess, 

283 self._driver.allow_access, 

284 self._context, 

285 self.share, 

286 fake_share.fake_access( 

287 access_type='invalid_access_type'), 

288 share_server=None) 

289 

290 def test_allow_access_invalid_access_level(self): 

291 self.assertRaises(exception.InvalidShareAccess, 

292 self._driver.allow_access, 

293 self._context, 

294 self.share, 

295 fake_share.fake_access( 

296 access_level='invalid_access_level'), 

297 share_server=None) 

298 

299 def test_allow_access_exception(self): 

300 self._driver._hdfs_execute = mock.Mock( 

301 side_effect=exception.ProcessExecutionError) 

302 share_dir = '/' + self.share['name'] 

303 user_access = ':'.join([self.access['access_type'], 

304 self.access['access_to'], 

305 'rwx']) 

306 cmd = ['fake_hdfs_bin', 'dfs', '-setfacl', '-m', '-R', 

307 user_access, share_dir] 

308 self.assertRaises(exception.HDFSException, 

309 self._driver.allow_access, 

310 self._context, 

311 self.share, 

312 self.access, 

313 share_server=None) 

314 self._driver._hdfs_execute.assert_called_once_with( 

315 *cmd, check_exit_code=True) 

316 

317 def test_deny_access(self): 

318 self._driver._hdfs_execute = mock.Mock(return_value=['', '']) 

319 share_dir = '/' + self.share['name'] 

320 access_name = ':'.join([self.access['access_type'], 

321 self.access['access_to']]) 

322 cmd = ['fake_hdfs_bin', 'dfs', '-setfacl', '-x', '-R', 

323 access_name, share_dir] 

324 self._driver.deny_access(self._context, 

325 self.share, 

326 self.access, 

327 share_server=None) 

328 self._driver._hdfs_execute.assert_called_once_with( 

329 *cmd, check_exit_code=True) 

330 

331 def test_deny_access_exception(self): 

332 self._driver._hdfs_execute = mock.Mock( 

333 side_effect=exception.ProcessExecutionError) 

334 share_dir = '/' + self.share['name'] 

335 access_name = ':'.join([self.access['access_type'], 

336 self.access['access_to']]) 

337 cmd = ['fake_hdfs_bin', 'dfs', '-setfacl', '-x', '-R', 

338 access_name, share_dir] 

339 self.assertRaises(exception.HDFSException, 

340 self._driver.deny_access, 

341 self._context, 

342 self.share, 

343 self.access, 

344 share_server=None) 

345 self._driver._hdfs_execute.assert_called_once_with( 

346 *cmd, check_exit_code=True) 

347 

348 def test_extend_share(self): 

349 new_size = "fake_size" 

350 self._driver._set_share_size = mock.Mock() 

351 self._driver.extend_share(self.share, new_size) 

352 self._driver._set_share_size.assert_called_once_with( 

353 self.share, new_size) 

354 

355 def test__check_hdfs_state_healthy(self): 

356 fake_out = "fakeinfo\n...Status: HEALTHY" 

357 self._driver._hdfs_execute = mock.Mock(return_value=(fake_out, '')) 

358 result = self._driver._check_hdfs_state() 

359 self._driver._hdfs_execute.assert_called_once_with( 

360 'fake_hdfs_bin', 'fsck', '/') 

361 self.assertTrue(result) 

362 

363 def test__check_hdfs_state_down(self): 

364 fake_out = "fakeinfo\n...Status: DOWN" 

365 self._driver._hdfs_execute = mock.Mock(return_value=(fake_out, '')) 

366 result = self._driver._check_hdfs_state() 

367 self._driver._hdfs_execute.assert_called_once_with( 

368 'fake_hdfs_bin', 'fsck', '/') 

369 self.assertFalse(result) 

370 

371 def test__check_hdfs_state_exception(self): 

372 self._driver._hdfs_execute = mock.Mock( 

373 side_effect=exception.ProcessExecutionError) 

374 self.assertRaises(exception.HDFSException, 

375 self._driver._check_hdfs_state) 

376 self._driver._hdfs_execute.assert_called_once_with( 

377 'fake_hdfs_bin', 'fsck', '/') 

378 

379 def test__get_available_capacity(self): 

380 fake_out = ('Configured Capacity: 2.4\n' + 

381 'Total Capacity: 2\n' + 

382 'DFS free: 1') 

383 self._driver._hdfs_execute = mock.Mock(return_value=(fake_out, '')) 

384 total, free = self._driver._get_available_capacity() 

385 self._driver._hdfs_execute.assert_called_once_with( 

386 'fake_hdfs_bin', 'dfsadmin', '-report') 

387 self.assertEqual(2, total) 

388 self.assertEqual(1, free) 

389 

390 def test__get_available_capacity_exception(self): 

391 self._driver._hdfs_execute = mock.Mock( 

392 side_effect=exception.ProcessExecutionError) 

393 self.assertRaises(exception.HDFSException, 

394 self._driver._get_available_capacity) 

395 self._driver._hdfs_execute.assert_called_once_with( 

396 'fake_hdfs_bin', 'dfsadmin', '-report') 

397 

398 def test_get_share_stats_refresh_false(self): 

399 self._driver._stats = {'fake_key': 'fake_value'} 

400 result = self._driver.get_share_stats(False) 

401 self.assertEqual(self._driver._stats, result) 

402 

403 def test_get_share_stats_refresh_true(self): 

404 self._driver._get_available_capacity = mock.Mock( 

405 return_value=(11111.0, 12345.0)) 

406 result = self._driver.get_share_stats(True) 

407 expected_keys = [ 

408 'qos', 'driver_version', 'share_backend_name', 

409 'free_capacity_gb', 'total_capacity_gb', 

410 'driver_handles_share_servers', 

411 'reserved_percentage', 'vendor_name', 'storage_protocol', 

412 'ipv4_support', 'ipv6_support' 

413 ] 

414 for key in expected_keys: 

415 self.assertIn(key, result) 

416 self.assertTrue(result['ipv4_support']) 

417 self.assertFalse(result['ipv6_support']) 

418 self.assertEqual('HDFS', result['storage_protocol']) 

419 self._driver._get_available_capacity.assert_called_once_with() 

420 

421 def test__hdfs_local_execute(self): 

422 cmd = 'testcmd' 

423 self.mock_object(utils, 'execute', mock.Mock(return_value=True)) 

424 self._driver._hdfs_local_execute(cmd) 

425 utils.execute.assert_called_once_with(cmd, run_as_root=False) 

426 

427 def test__hdfs_remote_execute(self): 

428 self._driver._run_ssh = mock.Mock(return_value=True) 

429 cmd = 'testcmd' 

430 self._driver._hdfs_remote_execute(cmd, check_exit_code=True) 

431 self._driver._run_ssh.assert_called_once_with( 

432 self.local_ip, tuple([cmd]), True) 

433 

434 def test__run_ssh(self): 

435 ssh_output = 'fake_ssh_output' 

436 cmd_list = ['fake', 'cmd'] 

437 ssh = mock.Mock() 

438 ssh.get_transport = mock.Mock() 

439 ssh.get_transport().is_active = mock.Mock(return_value=True) 

440 ssh_pool = mock.Mock() 

441 ssh_pool.create = mock.Mock(return_value=ssh) 

442 self.mock_object(ssh_utils, 

443 'SSHPool', 

444 mock.Mock(return_value=ssh_pool)) 

445 self.mock_object(processutils, 'ssh_execute', 

446 mock.Mock(return_value=ssh_output)) 

447 result = self._driver._run_ssh(self.local_ip, cmd_list) 

448 ssh_utils.SSHPool.assert_called_once_with( 

449 self._driver.configuration.hdfs_namenode_ip, 

450 self._driver.configuration.hdfs_ssh_port, 

451 self._driver.configuration.ssh_conn_timeout, 

452 self._driver.configuration.hdfs_ssh_name, 

453 password=self._driver.configuration.hdfs_ssh_pw, 

454 privatekey=self._driver.configuration.hdfs_ssh_private_key, 

455 min_size=self._driver.configuration.ssh_min_pool_conn, 

456 max_size=self._driver.configuration.ssh_max_pool_conn) 

457 ssh_pool.create.assert_called_once_with() 

458 ssh.get_transport().is_active.assert_called_once_with() 

459 processutils.ssh_execute.assert_called_once_with( 

460 ssh, 'fake cmd', check_exit_code=False) 

461 self.assertEqual(ssh_output, result) 

462 

463 def test__run_ssh_exception(self): 

464 cmd_list = ['fake', 'cmd'] 

465 ssh = mock.Mock() 

466 ssh.get_transport = mock.Mock() 

467 ssh.get_transport().is_active = mock.Mock(return_value=True) 

468 ssh_pool = mock.Mock() 

469 ssh_pool.create = mock.Mock(return_value=ssh) 

470 self.mock_object(ssh_utils, 

471 'SSHPool', 

472 mock.Mock(return_value=ssh_pool)) 

473 self.mock_object(processutils, 'ssh_execute', 

474 mock.Mock(side_effect=Exception)) 

475 self.assertRaises(exception.HDFSException, 

476 self._driver._run_ssh, 

477 self.local_ip, 

478 cmd_list) 

479 ssh_utils.SSHPool.assert_called_once_with( 

480 self._driver.configuration.hdfs_namenode_ip, 

481 self._driver.configuration.hdfs_ssh_port, 

482 self._driver.configuration.ssh_conn_timeout, 

483 self._driver.configuration.hdfs_ssh_name, 

484 password=self._driver.configuration.hdfs_ssh_pw, 

485 privatekey=self._driver.configuration.hdfs_ssh_private_key, 

486 min_size=self._driver.configuration.ssh_min_pool_conn, 

487 max_size=self._driver.configuration.ssh_max_pool_conn) 

488 ssh_pool.create.assert_called_once_with() 

489 ssh.get_transport().is_active.assert_called_once_with() 

490 processutils.ssh_execute.assert_called_once_with( 

491 ssh, 'fake cmd', check_exit_code=False)