Coverage for manila/share/drivers/hdfs/hdfs_native.py: 84%
226 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2015 Intel, Corp.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16"""HDFS native protocol (hdfs) driver for manila shares.
18Manila share is a directory in HDFS. And this share does not use
19service VM instance (share server). The instance directly talks
20to the HDFS cluster.
22The initial version only supports single namenode and flat network.
24Configuration Requirements:
25 To enable access control, HDFS file system must have ACLs enabled.
26"""
28import math
29import os
30import shlex
31import socket
33from oslo_concurrency import processutils
34from oslo_config import cfg
35from oslo_log import log
36from oslo_utils import units
38from manila import exception
39from manila.i18n import _
40from manila.share import driver
41from manila import ssh_utils
42from manila import utils
44LOG = log.getLogger(__name__)
46hdfs_native_share_opts = [
47 cfg.HostAddressOpt('hdfs_namenode_ip',
48 help='The IP of the HDFS namenode.'),
49 cfg.PortOpt('hdfs_namenode_port',
50 default=9000,
51 help='The port of HDFS namenode service.'),
52 cfg.PortOpt('hdfs_ssh_port',
53 default=22,
54 help='HDFS namenode SSH port.'),
55 cfg.StrOpt('hdfs_ssh_name',
56 help='HDFS namenode ssh login name.'),
57 cfg.StrOpt('hdfs_ssh_pw',
58 secret=True,
59 help='HDFS namenode SSH login password, '
60 'This parameter is not necessary, if '
61 '\'hdfs_ssh_private_key\' is configured.'),
62 cfg.StrOpt('hdfs_ssh_private_key',
63 help='Path to HDFS namenode SSH private '
64 'key for login.'),
65]
67CONF = cfg.CONF
68CONF.register_opts(hdfs_native_share_opts)
71class HDFSNativeShareDriver(driver.ExecuteMixin, driver.ShareDriver):
72 """HDFS Share Driver.
74 Executes commands relating to shares.
75 API version history:
77 1.0 - Initial Version
78 """
80 def __init__(self, *args, **kwargs):
81 super(HDFSNativeShareDriver, self).__init__(False, *args, **kwargs)
82 self.configuration.append_config_values(hdfs_native_share_opts)
83 self.backend_name = self.configuration.safe_get(
84 'share_backend_name') or 'HDFS-Native'
85 self.ssh_connections = {}
86 self._hdfs_execute = None
87 self._hdfs_bin = None
88 self._hdfs_base_path = None
90 def do_setup(self, context):
91 """Do initialization while the share driver starts."""
92 super(HDFSNativeShareDriver, self).do_setup(context)
93 host = self.configuration.hdfs_namenode_ip
94 local_hosts = socket.gethostbyname_ex(socket.gethostname())[2]
95 if host in local_hosts: 95 ↛ 98line 95 didn't jump to line 98 because the condition on line 95 was always true
96 self._hdfs_execute = self._hdfs_local_execute
97 else:
98 self._hdfs_execute = self._hdfs_remote_execute
100 self._hdfs_bin = 'hdfs'
101 self._hdfs_base_path = (
102 'hdfs://' + self.configuration.hdfs_namenode_ip + ':'
103 + str(self.configuration.hdfs_namenode_port))
105 def _hdfs_local_execute(self, *cmd, **kwargs):
106 if 'run_as_root' not in kwargs: 106 ↛ 109line 106 didn't jump to line 109 because the condition on line 106 was always true
107 kwargs.update({'run_as_root': False})
109 return utils.execute(*cmd, **kwargs)
111 def _hdfs_remote_execute(self, *cmd, **kwargs):
112 host = self.configuration.hdfs_namenode_ip
113 check_exit_code = kwargs.pop('check_exit_code', False)
115 return self._run_ssh(host, cmd, check_exit_code)
117 def _run_ssh(self, host, cmd_list, check_exit_code=False):
118 command = ' '.join(shlex.quote(cmd_arg) for cmd_arg in cmd_list)
119 connection = self.ssh_connections.get(host)
120 if not connection: 120 ↛ 140line 120 didn't jump to line 140 because the condition on line 120 was always true
121 hdfs_ssh_name = self.configuration.hdfs_ssh_name
122 password = self.configuration.hdfs_ssh_pw
123 privatekey = self.configuration.hdfs_ssh_private_key
124 hdfs_ssh_port = self.configuration.hdfs_ssh_port
125 ssh_conn_timeout = self.configuration.ssh_conn_timeout
126 min_size = self.configuration.ssh_min_pool_conn
127 max_size = self.configuration.ssh_max_pool_conn
129 ssh_pool = ssh_utils.SSHPool(host,
130 hdfs_ssh_port,
131 ssh_conn_timeout,
132 hdfs_ssh_name,
133 password=password,
134 privatekey=privatekey,
135 min_size=min_size,
136 max_size=max_size)
137 ssh = ssh_pool.create()
138 self.ssh_connections[host] = (ssh_pool, ssh)
139 else:
140 ssh_pool, ssh = connection
142 if not ssh.get_transport().is_active(): 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true
143 ssh_pool.remove(ssh)
144 ssh = ssh_pool.create()
145 self.ssh_connections[host] = (ssh_pool, ssh)
147 try:
148 return processutils.ssh_execute(
149 ssh,
150 command,
151 check_exit_code=check_exit_code)
152 except Exception as e:
153 msg = (_('Error running SSH command: %(cmd)s. '
154 'Error: %(excmsg)s.') %
155 {'cmd': command, 'excmsg': str(e)})
156 LOG.error(msg)
157 raise exception.HDFSException(msg)
159 def _set_share_size(self, share, size=None):
160 share_dir = '/' + share['name']
162 if not size:
163 sizestr = str(share['size']) + 'g'
164 else:
165 sizestr = str(size) + 'g'
167 try:
168 self._hdfs_execute(self._hdfs_bin, 'dfsadmin',
169 '-setSpaceQuota', sizestr, share_dir)
170 except exception.ProcessExecutionError as e:
171 msg = (_('Failed to set space quota for the '
172 'share %(sharename)s. Error: %(excmsg)s.') %
173 {'sharename': share['name'],
174 'excmsg': str(e)})
175 LOG.error(msg)
176 raise exception.HDFSException(msg)
178 def _create_share(self, share):
179 """Creates a share."""
180 if share['share_proto'].lower() != 'hdfs':
181 msg = _('Only HDFS protocol supported!')
182 LOG.error(msg)
183 raise exception.HDFSException(msg)
185 share_dir = '/' + share['name']
187 try:
188 self._hdfs_execute(self._hdfs_bin, 'dfs',
189 '-mkdir', share_dir)
190 except exception.ProcessExecutionError as e:
191 msg = (_('Failed to create directory in hdfs for the '
192 'share %(sharename)s. Error: %(excmsg)s.') %
193 {'sharename': share['name'],
194 'excmsg': str(e)})
195 LOG.error(msg)
196 raise exception.HDFSException(msg)
198 # set share size
199 self._set_share_size(share)
201 try:
202 self._hdfs_execute(self._hdfs_bin, 'dfsadmin',
203 '-allowSnapshot', share_dir)
204 except exception.ProcessExecutionError as e:
205 msg = (_('Failed to allow snapshot for the '
206 'share %(sharename)s. Error: %(excmsg)s.') %
207 {'sharename': share['name'],
208 'excmsg': str(e)})
209 LOG.error(msg)
210 raise exception.HDFSException(msg)
212 def _get_share_path(self, share):
213 """Return share path on storage provider."""
214 return os.path.join(self._hdfs_base_path, share['name'])
216 def _get_snapshot_path(self, snapshot):
217 """Return snapshot path on storage provider."""
218 snapshot_dir = '.snapshot'
219 return os.path.join('/', snapshot['share_name'],
220 snapshot_dir, snapshot['name'])
222 def get_network_allocations_number(self):
223 return 0
225 def create_share(self, context, share, share_server=None):
226 """Create a HDFS directory which acted as a share."""
227 self._create_share(share)
228 return self._get_share_path(share)
230 def create_share_from_snapshot(self, context, share, snapshot,
231 share_server=None, parent_share=None):
232 """Creates a snapshot."""
233 self._create_share(share)
234 share_path = '/' + share['name']
235 snapshot_path = self._get_snapshot_path(snapshot)
237 try:
238 # check if the directory is empty
239 (out, __) = self._hdfs_execute(
240 self._hdfs_bin, 'dfs', '-ls', snapshot_path)
241 # only copy files when the snapshot directory is not empty
242 if out:
243 copy_path = snapshot_path + "/*"
245 cmd = [self._hdfs_bin, 'dfs', '-cp',
246 copy_path, share_path]
248 self._hdfs_execute(*cmd)
250 except exception.ProcessExecutionError as e:
251 msg = (_('Failed to create share %(sharename)s from '
252 'snapshot %(snapshotname)s. Error: %(excmsg)s.') %
253 {'sharename': share['name'],
254 'snapshotname': snapshot['name'],
255 'excmsg': str(e)})
256 LOG.error(msg)
257 raise exception.HDFSException(msg)
259 return self._get_share_path(share)
261 def create_snapshot(self, context, snapshot, share_server=None):
262 """Creates a snapshot."""
263 share_dir = '/' + snapshot['share_name']
264 snapshot_name = snapshot['name']
266 cmd = [self._hdfs_bin, 'dfs', '-createSnapshot',
267 share_dir, snapshot_name]
268 try:
269 self._hdfs_execute(*cmd)
270 except exception.ProcessExecutionError as e:
271 msg = (_('Failed to create snapshot %(snapshotname)s for '
272 'the share %(sharename)s. Error: %(excmsg)s.') %
273 {'snapshotname': snapshot_name,
274 'sharename': snapshot['share_name'],
275 'excmsg': str(e)})
276 LOG.error(msg)
277 raise exception.HDFSException(msg)
279 def delete_share(self, context, share, share_server=None):
280 """Deletes share storage."""
281 share_dir = '/' + share['name']
283 cmd = [self._hdfs_bin, 'dfs', '-rm', '-r', share_dir]
284 try:
285 self._hdfs_execute(*cmd)
286 except exception.ProcessExecutionError as e:
287 msg = (_('Failed to delete share %(sharename)s. '
288 'Error: %(excmsg)s.') %
289 {'sharename': share['name'],
290 'excmsg': str(e)})
291 LOG.error(msg)
292 raise exception.HDFSException(msg)
294 def delete_snapshot(self, context, snapshot, share_server=None):
295 """Deletes a snapshot."""
296 share_dir = '/' + snapshot['share_name']
298 cmd = [self._hdfs_bin, 'dfs', '-deleteSnapshot',
299 share_dir, snapshot['name']]
300 try:
301 self._hdfs_execute(*cmd)
302 except exception.ProcessExecutionError as e:
303 msg = (_('Failed to delete snapshot %(snapshotname)s. '
304 'Error: %(excmsg)s.') %
305 {'snapshotname': snapshot['name'],
306 'excmsg': str(e)})
307 LOG.error(msg)
308 raise exception.HDFSException(msg)
310 def ensure_share(self, context, share, share_server=None):
311 """Ensure the storage are exported."""
313 def allow_access(self, context, share, access, share_server=None):
314 """Allows access to the share for a given user."""
315 if access['access_type'] != 'user':
316 msg = _("Only 'user' access type allowed!")
317 LOG.error(msg)
318 raise exception.InvalidShareAccess(msg)
320 # Note(jun): For directories in HDFS, the x permission is
321 # required to access a child of the directory.
322 if access['access_level'] == 'rw': 322 ↛ 324line 322 didn't jump to line 324 because the condition on line 322 was always true
323 access_level = 'rwx'
324 elif access['access_level'] == 'ro':
325 access_level = 'r-x'
326 else:
327 msg = (_('The access level %(accesslevel)s was unsupported.') %
328 {'accesslevel': access['access_level']})
329 LOG.error(msg)
330 raise exception.InvalidShareAccess(msg)
332 share_dir = '/' + share['name']
333 user_access = ':'.join([access['access_type'],
334 access['access_to'],
335 access_level])
337 cmd = [self._hdfs_bin, 'dfs', '-setfacl', '-m', '-R',
338 user_access, share_dir]
339 try:
340 (__, out) = self._hdfs_execute(*cmd, check_exit_code=True)
341 except exception.ProcessExecutionError as e:
342 msg = (_('Failed to set ACL of share %(sharename)s for '
343 'user: %(username)s'
344 'Error: %(excmsg)s.') %
345 {'sharename': share['name'],
346 'username': access['access_to'],
347 'excmsg': str(e)})
348 LOG.error(msg)
349 raise exception.HDFSException(msg)
351 def deny_access(self, context, share, access, share_server=None):
352 """Denies the access to the share for a given user."""
353 share_dir = '/' + share['name']
354 access_name = ':'.join([access['access_type'], access['access_to']])
356 cmd = [self._hdfs_bin, 'dfs', '-setfacl', '-x', '-R',
357 access_name, share_dir]
358 try:
359 (__, out) = self._hdfs_execute(*cmd, check_exit_code=True)
360 except exception.ProcessExecutionError as e:
361 msg = (_('Failed to deny ACL of share %(sharename)s for '
362 'user: %(username)s'
363 'Error: %(excmsg)s.') %
364 {'sharename': share['name'],
365 'username': access['access_to'],
366 'excmsg': str(e)})
367 LOG.error(msg)
368 raise exception.HDFSException(msg)
370 def extend_share(self, share, new_size, share_server=None):
371 """Extend share storage."""
372 self._set_share_size(share, new_size)
374 def _check_hdfs_state(self):
375 try:
376 (out, __) = self._hdfs_execute(self._hdfs_bin, 'fsck', '/')
377 except exception.ProcessExecutionError as e:
378 msg = (_('Failed to check hdfs state. Error: %(excmsg)s.') %
379 {'excmsg': str(e)})
380 LOG.error(msg)
381 raise exception.HDFSException(msg)
382 if 'HEALTHY' in out:
383 return True
384 else:
385 return False
387 def check_for_setup_error(self):
388 """Return an error if the prerequisites are met."""
389 if not self.configuration.hdfs_namenode_ip:
390 msg = _('Not specify the hdfs cluster yet! '
391 'Add the ip of hdfs namenode in the '
392 'hdfs_namenode_ip configuration parameter.')
393 LOG.error(msg)
394 raise exception.HDFSException(msg)
396 if not self._check_hdfs_state():
397 msg = _('HDFS is not in healthy state.')
398 LOG.error(msg)
399 raise exception.HDFSException(msg)
401 def _get_available_capacity(self):
402 """Calculate available space on path."""
403 try:
404 (out, __) = self._hdfs_execute(self._hdfs_bin, 'dfsadmin',
405 '-report')
406 except exception.ProcessExecutionError as e:
407 msg = (_('Failed to check available capacity for hdfs.'
408 'Error: %(excmsg)s.') %
409 {'excmsg': str(e)})
410 LOG.error(msg)
411 raise exception.HDFSException(msg)
413 lines = out.splitlines()
414 try:
415 total = int(lines[1].split()[2])
416 free = int(lines[2].split()[2])
417 except (IndexError, ValueError) as e:
418 msg = (_('Failed to get hdfs capacity info. '
419 'Error: %(excmsg)s.') %
420 {'excmsg': str(e)})
421 LOG.error(msg)
422 raise exception.HDFSException(msg)
423 return total, free
425 def _update_share_stats(self):
426 """Retrieves stats info of share directories group."""
428 data = dict(share_backend_name=self.backend_name,
429 storage_protocol='HDFS',
430 reserved_percentage=self.configuration.
431 reserved_share_percentage,
432 reserved_snapshot_percentage=self.configuration.
433 reserved_share_from_snapshot_percentage
434 or self.configuration.reserved_share_percentage,
435 reserved_share_extend_percentage=self.configuration.
436 reserved_share_extend_percentage
437 or self.configuration.reserved_share_percentage)
439 total, free = self._get_available_capacity()
441 data['total_capacity_gb'] = math.ceil(total / units.Gi)
442 data['free_capacity_gb'] = math.ceil(free / units.Gi)
444 super(HDFSNativeShareDriver, self)._update_share_stats(data)