Coverage for manila/share/drivers/hdfs/hdfs_native.py: 84%

226 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2015 Intel, Corp. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""HDFS native protocol (hdfs) driver for manila shares. 

17 

18Manila share is a directory in HDFS. And this share does not use 

19service VM instance (share server). The instance directly talks 

20to the HDFS cluster. 

21 

22The initial version only supports single namenode and flat network. 

23 

24Configuration Requirements: 

25 To enable access control, HDFS file system must have ACLs enabled. 

26""" 

27 

28import math 

29import os 

30import shlex 

31import socket 

32 

33from oslo_concurrency import processutils 

34from oslo_config import cfg 

35from oslo_log import log 

36from oslo_utils import units 

37 

38from manila import exception 

39from manila.i18n import _ 

40from manila.share import driver 

41from manila import ssh_utils 

42from manila import utils 

43 

44LOG = log.getLogger(__name__) 

45 

46hdfs_native_share_opts = [ 

47 cfg.HostAddressOpt('hdfs_namenode_ip', 

48 help='The IP of the HDFS namenode.'), 

49 cfg.PortOpt('hdfs_namenode_port', 

50 default=9000, 

51 help='The port of HDFS namenode service.'), 

52 cfg.PortOpt('hdfs_ssh_port', 

53 default=22, 

54 help='HDFS namenode SSH port.'), 

55 cfg.StrOpt('hdfs_ssh_name', 

56 help='HDFS namenode ssh login name.'), 

57 cfg.StrOpt('hdfs_ssh_pw', 

58 secret=True, 

59 help='HDFS namenode SSH login password, ' 

60 'This parameter is not necessary, if ' 

61 '\'hdfs_ssh_private_key\' is configured.'), 

62 cfg.StrOpt('hdfs_ssh_private_key', 

63 help='Path to HDFS namenode SSH private ' 

64 'key for login.'), 

65] 

66 

67CONF = cfg.CONF 

68CONF.register_opts(hdfs_native_share_opts) 

69 

70 

71class HDFSNativeShareDriver(driver.ExecuteMixin, driver.ShareDriver): 

72 """HDFS Share Driver. 

73 

74 Executes commands relating to shares. 

75 API version history: 

76 

77 1.0 - Initial Version 

78 """ 

79 

80 def __init__(self, *args, **kwargs): 

81 super(HDFSNativeShareDriver, self).__init__(False, *args, **kwargs) 

82 self.configuration.append_config_values(hdfs_native_share_opts) 

83 self.backend_name = self.configuration.safe_get( 

84 'share_backend_name') or 'HDFS-Native' 

85 self.ssh_connections = {} 

86 self._hdfs_execute = None 

87 self._hdfs_bin = None 

88 self._hdfs_base_path = None 

89 

90 def do_setup(self, context): 

91 """Do initialization while the share driver starts.""" 

92 super(HDFSNativeShareDriver, self).do_setup(context) 

93 host = self.configuration.hdfs_namenode_ip 

94 local_hosts = socket.gethostbyname_ex(socket.gethostname())[2] 

95 if host in local_hosts: 95 ↛ 98line 95 didn't jump to line 98 because the condition on line 95 was always true

96 self._hdfs_execute = self._hdfs_local_execute 

97 else: 

98 self._hdfs_execute = self._hdfs_remote_execute 

99 

100 self._hdfs_bin = 'hdfs' 

101 self._hdfs_base_path = ( 

102 'hdfs://' + self.configuration.hdfs_namenode_ip + ':' 

103 + str(self.configuration.hdfs_namenode_port)) 

104 

105 def _hdfs_local_execute(self, *cmd, **kwargs): 

106 if 'run_as_root' not in kwargs: 106 ↛ 109line 106 didn't jump to line 109 because the condition on line 106 was always true

107 kwargs.update({'run_as_root': False}) 

108 

109 return utils.execute(*cmd, **kwargs) 

110 

111 def _hdfs_remote_execute(self, *cmd, **kwargs): 

112 host = self.configuration.hdfs_namenode_ip 

113 check_exit_code = kwargs.pop('check_exit_code', False) 

114 

115 return self._run_ssh(host, cmd, check_exit_code) 

116 

117 def _run_ssh(self, host, cmd_list, check_exit_code=False): 

118 command = ' '.join(shlex.quote(cmd_arg) for cmd_arg in cmd_list) 

119 connection = self.ssh_connections.get(host) 

120 if not connection: 120 ↛ 140line 120 didn't jump to line 140 because the condition on line 120 was always true

121 hdfs_ssh_name = self.configuration.hdfs_ssh_name 

122 password = self.configuration.hdfs_ssh_pw 

123 privatekey = self.configuration.hdfs_ssh_private_key 

124 hdfs_ssh_port = self.configuration.hdfs_ssh_port 

125 ssh_conn_timeout = self.configuration.ssh_conn_timeout 

126 min_size = self.configuration.ssh_min_pool_conn 

127 max_size = self.configuration.ssh_max_pool_conn 

128 

129 ssh_pool = ssh_utils.SSHPool(host, 

130 hdfs_ssh_port, 

131 ssh_conn_timeout, 

132 hdfs_ssh_name, 

133 password=password, 

134 privatekey=privatekey, 

135 min_size=min_size, 

136 max_size=max_size) 

137 ssh = ssh_pool.create() 

138 self.ssh_connections[host] = (ssh_pool, ssh) 

139 else: 

140 ssh_pool, ssh = connection 

141 

142 if not ssh.get_transport().is_active(): 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true

143 ssh_pool.remove(ssh) 

144 ssh = ssh_pool.create() 

145 self.ssh_connections[host] = (ssh_pool, ssh) 

146 

147 try: 

148 return processutils.ssh_execute( 

149 ssh, 

150 command, 

151 check_exit_code=check_exit_code) 

152 except Exception as e: 

153 msg = (_('Error running SSH command: %(cmd)s. ' 

154 'Error: %(excmsg)s.') % 

155 {'cmd': command, 'excmsg': str(e)}) 

156 LOG.error(msg) 

157 raise exception.HDFSException(msg) 

158 

159 def _set_share_size(self, share, size=None): 

160 share_dir = '/' + share['name'] 

161 

162 if not size: 

163 sizestr = str(share['size']) + 'g' 

164 else: 

165 sizestr = str(size) + 'g' 

166 

167 try: 

168 self._hdfs_execute(self._hdfs_bin, 'dfsadmin', 

169 '-setSpaceQuota', sizestr, share_dir) 

170 except exception.ProcessExecutionError as e: 

171 msg = (_('Failed to set space quota for the ' 

172 'share %(sharename)s. Error: %(excmsg)s.') % 

173 {'sharename': share['name'], 

174 'excmsg': str(e)}) 

175 LOG.error(msg) 

176 raise exception.HDFSException(msg) 

177 

178 def _create_share(self, share): 

179 """Creates a share.""" 

180 if share['share_proto'].lower() != 'hdfs': 

181 msg = _('Only HDFS protocol supported!') 

182 LOG.error(msg) 

183 raise exception.HDFSException(msg) 

184 

185 share_dir = '/' + share['name'] 

186 

187 try: 

188 self._hdfs_execute(self._hdfs_bin, 'dfs', 

189 '-mkdir', share_dir) 

190 except exception.ProcessExecutionError as e: 

191 msg = (_('Failed to create directory in hdfs for the ' 

192 'share %(sharename)s. Error: %(excmsg)s.') % 

193 {'sharename': share['name'], 

194 'excmsg': str(e)}) 

195 LOG.error(msg) 

196 raise exception.HDFSException(msg) 

197 

198 # set share size 

199 self._set_share_size(share) 

200 

201 try: 

202 self._hdfs_execute(self._hdfs_bin, 'dfsadmin', 

203 '-allowSnapshot', share_dir) 

204 except exception.ProcessExecutionError as e: 

205 msg = (_('Failed to allow snapshot for the ' 

206 'share %(sharename)s. Error: %(excmsg)s.') % 

207 {'sharename': share['name'], 

208 'excmsg': str(e)}) 

209 LOG.error(msg) 

210 raise exception.HDFSException(msg) 

211 

212 def _get_share_path(self, share): 

213 """Return share path on storage provider.""" 

214 return os.path.join(self._hdfs_base_path, share['name']) 

215 

216 def _get_snapshot_path(self, snapshot): 

217 """Return snapshot path on storage provider.""" 

218 snapshot_dir = '.snapshot' 

219 return os.path.join('/', snapshot['share_name'], 

220 snapshot_dir, snapshot['name']) 

221 

222 def get_network_allocations_number(self): 

223 return 0 

224 

225 def create_share(self, context, share, share_server=None): 

226 """Create a HDFS directory which acted as a share.""" 

227 self._create_share(share) 

228 return self._get_share_path(share) 

229 

230 def create_share_from_snapshot(self, context, share, snapshot, 

231 share_server=None, parent_share=None): 

232 """Creates a snapshot.""" 

233 self._create_share(share) 

234 share_path = '/' + share['name'] 

235 snapshot_path = self._get_snapshot_path(snapshot) 

236 

237 try: 

238 # check if the directory is empty 

239 (out, __) = self._hdfs_execute( 

240 self._hdfs_bin, 'dfs', '-ls', snapshot_path) 

241 # only copy files when the snapshot directory is not empty 

242 if out: 

243 copy_path = snapshot_path + "/*" 

244 

245 cmd = [self._hdfs_bin, 'dfs', '-cp', 

246 copy_path, share_path] 

247 

248 self._hdfs_execute(*cmd) 

249 

250 except exception.ProcessExecutionError as e: 

251 msg = (_('Failed to create share %(sharename)s from ' 

252 'snapshot %(snapshotname)s. Error: %(excmsg)s.') % 

253 {'sharename': share['name'], 

254 'snapshotname': snapshot['name'], 

255 'excmsg': str(e)}) 

256 LOG.error(msg) 

257 raise exception.HDFSException(msg) 

258 

259 return self._get_share_path(share) 

260 

261 def create_snapshot(self, context, snapshot, share_server=None): 

262 """Creates a snapshot.""" 

263 share_dir = '/' + snapshot['share_name'] 

264 snapshot_name = snapshot['name'] 

265 

266 cmd = [self._hdfs_bin, 'dfs', '-createSnapshot', 

267 share_dir, snapshot_name] 

268 try: 

269 self._hdfs_execute(*cmd) 

270 except exception.ProcessExecutionError as e: 

271 msg = (_('Failed to create snapshot %(snapshotname)s for ' 

272 'the share %(sharename)s. Error: %(excmsg)s.') % 

273 {'snapshotname': snapshot_name, 

274 'sharename': snapshot['share_name'], 

275 'excmsg': str(e)}) 

276 LOG.error(msg) 

277 raise exception.HDFSException(msg) 

278 

279 def delete_share(self, context, share, share_server=None): 

280 """Deletes share storage.""" 

281 share_dir = '/' + share['name'] 

282 

283 cmd = [self._hdfs_bin, 'dfs', '-rm', '-r', share_dir] 

284 try: 

285 self._hdfs_execute(*cmd) 

286 except exception.ProcessExecutionError as e: 

287 msg = (_('Failed to delete share %(sharename)s. ' 

288 'Error: %(excmsg)s.') % 

289 {'sharename': share['name'], 

290 'excmsg': str(e)}) 

291 LOG.error(msg) 

292 raise exception.HDFSException(msg) 

293 

294 def delete_snapshot(self, context, snapshot, share_server=None): 

295 """Deletes a snapshot.""" 

296 share_dir = '/' + snapshot['share_name'] 

297 

298 cmd = [self._hdfs_bin, 'dfs', '-deleteSnapshot', 

299 share_dir, snapshot['name']] 

300 try: 

301 self._hdfs_execute(*cmd) 

302 except exception.ProcessExecutionError as e: 

303 msg = (_('Failed to delete snapshot %(snapshotname)s. ' 

304 'Error: %(excmsg)s.') % 

305 {'snapshotname': snapshot['name'], 

306 'excmsg': str(e)}) 

307 LOG.error(msg) 

308 raise exception.HDFSException(msg) 

309 

310 def ensure_share(self, context, share, share_server=None): 

311 """Ensure the storage are exported.""" 

312 

313 def allow_access(self, context, share, access, share_server=None): 

314 """Allows access to the share for a given user.""" 

315 if access['access_type'] != 'user': 

316 msg = _("Only 'user' access type allowed!") 

317 LOG.error(msg) 

318 raise exception.InvalidShareAccess(msg) 

319 

320 # Note(jun): For directories in HDFS, the x permission is 

321 # required to access a child of the directory. 

322 if access['access_level'] == 'rw': 322 ↛ 324line 322 didn't jump to line 324 because the condition on line 322 was always true

323 access_level = 'rwx' 

324 elif access['access_level'] == 'ro': 

325 access_level = 'r-x' 

326 else: 

327 msg = (_('The access level %(accesslevel)s was unsupported.') % 

328 {'accesslevel': access['access_level']}) 

329 LOG.error(msg) 

330 raise exception.InvalidShareAccess(msg) 

331 

332 share_dir = '/' + share['name'] 

333 user_access = ':'.join([access['access_type'], 

334 access['access_to'], 

335 access_level]) 

336 

337 cmd = [self._hdfs_bin, 'dfs', '-setfacl', '-m', '-R', 

338 user_access, share_dir] 

339 try: 

340 (__, out) = self._hdfs_execute(*cmd, check_exit_code=True) 

341 except exception.ProcessExecutionError as e: 

342 msg = (_('Failed to set ACL of share %(sharename)s for ' 

343 'user: %(username)s' 

344 'Error: %(excmsg)s.') % 

345 {'sharename': share['name'], 

346 'username': access['access_to'], 

347 'excmsg': str(e)}) 

348 LOG.error(msg) 

349 raise exception.HDFSException(msg) 

350 

351 def deny_access(self, context, share, access, share_server=None): 

352 """Denies the access to the share for a given user.""" 

353 share_dir = '/' + share['name'] 

354 access_name = ':'.join([access['access_type'], access['access_to']]) 

355 

356 cmd = [self._hdfs_bin, 'dfs', '-setfacl', '-x', '-R', 

357 access_name, share_dir] 

358 try: 

359 (__, out) = self._hdfs_execute(*cmd, check_exit_code=True) 

360 except exception.ProcessExecutionError as e: 

361 msg = (_('Failed to deny ACL of share %(sharename)s for ' 

362 'user: %(username)s' 

363 'Error: %(excmsg)s.') % 

364 {'sharename': share['name'], 

365 'username': access['access_to'], 

366 'excmsg': str(e)}) 

367 LOG.error(msg) 

368 raise exception.HDFSException(msg) 

369 

370 def extend_share(self, share, new_size, share_server=None): 

371 """Extend share storage.""" 

372 self._set_share_size(share, new_size) 

373 

374 def _check_hdfs_state(self): 

375 try: 

376 (out, __) = self._hdfs_execute(self._hdfs_bin, 'fsck', '/') 

377 except exception.ProcessExecutionError as e: 

378 msg = (_('Failed to check hdfs state. Error: %(excmsg)s.') % 

379 {'excmsg': str(e)}) 

380 LOG.error(msg) 

381 raise exception.HDFSException(msg) 

382 if 'HEALTHY' in out: 

383 return True 

384 else: 

385 return False 

386 

387 def check_for_setup_error(self): 

388 """Return an error if the prerequisites are met.""" 

389 if not self.configuration.hdfs_namenode_ip: 

390 msg = _('Not specify the hdfs cluster yet! ' 

391 'Add the ip of hdfs namenode in the ' 

392 'hdfs_namenode_ip configuration parameter.') 

393 LOG.error(msg) 

394 raise exception.HDFSException(msg) 

395 

396 if not self._check_hdfs_state(): 

397 msg = _('HDFS is not in healthy state.') 

398 LOG.error(msg) 

399 raise exception.HDFSException(msg) 

400 

401 def _get_available_capacity(self): 

402 """Calculate available space on path.""" 

403 try: 

404 (out, __) = self._hdfs_execute(self._hdfs_bin, 'dfsadmin', 

405 '-report') 

406 except exception.ProcessExecutionError as e: 

407 msg = (_('Failed to check available capacity for hdfs.' 

408 'Error: %(excmsg)s.') % 

409 {'excmsg': str(e)}) 

410 LOG.error(msg) 

411 raise exception.HDFSException(msg) 

412 

413 lines = out.splitlines() 

414 try: 

415 total = int(lines[1].split()[2]) 

416 free = int(lines[2].split()[2]) 

417 except (IndexError, ValueError) as e: 

418 msg = (_('Failed to get hdfs capacity info. ' 

419 'Error: %(excmsg)s.') % 

420 {'excmsg': str(e)}) 

421 LOG.error(msg) 

422 raise exception.HDFSException(msg) 

423 return total, free 

424 

425 def _update_share_stats(self): 

426 """Retrieves stats info of share directories group.""" 

427 

428 data = dict(share_backend_name=self.backend_name, 

429 storage_protocol='HDFS', 

430 reserved_percentage=self.configuration. 

431 reserved_share_percentage, 

432 reserved_snapshot_percentage=self.configuration. 

433 reserved_share_from_snapshot_percentage 

434 or self.configuration.reserved_share_percentage, 

435 reserved_share_extend_percentage=self.configuration. 

436 reserved_share_extend_percentage 

437 or self.configuration.reserved_share_percentage) 

438 

439 total, free = self._get_available_capacity() 

440 

441 data['total_capacity_gb'] = math.ceil(total / units.Gi) 

442 data['free_capacity_gb'] = math.ceil(free / units.Gi) 

443 

444 super(HDFSNativeShareDriver, self)._update_share_stats(data)