Coverage for manila/share/drivers/maprfs/maprfs_native.py: 97%

245 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2016, MapR Technologies 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16""" 

17Share driver for MapR-FS distributed file system. 

18""" 

19import math 

20import os 

21 

22from oslo_config import cfg 

23from oslo_log import log 

24from oslo_utils import strutils 

25from oslo_utils import units 

26 

27from manila import context 

28from manila import exception 

29from manila.i18n import _ 

30from manila.share import api 

31from manila.share import driver 

32 

33from manila.share.drivers.maprfs import driver_util as mapru 

34 

35LOG = log.getLogger(__name__) 

36 

37maprfs_native_share_opts = [ 

38 cfg.ListOpt('maprfs_clinode_ip', 

39 help='The list of IPs or hostnames of nodes where mapr-core ' 

40 'is installed.'), 

41 cfg.PortOpt('maprfs_ssh_port', 

42 default=22, 

43 help='CLDB node SSH port.'), 

44 cfg.StrOpt('maprfs_ssh_name', 

45 default="mapr", 

46 help='Cluster admin user ssh login name.'), 

47 cfg.StrOpt('maprfs_ssh_pw', 

48 secret=True, 

49 help='Cluster node SSH login password, ' 

50 'This parameter is not necessary, if ' 

51 '\'maprfs_ssh_private_key\' is configured.'), 

52 cfg.StrOpt('maprfs_ssh_private_key', 

53 help='Path to SSH private ' 

54 'key for login.'), 

55 cfg.StrOpt('maprfs_base_volume_dir', 

56 default='/', 

57 help='Path in MapRFS where share volumes must be created.'), 

58 cfg.ListOpt('maprfs_zookeeper_ip', 

59 help='The list of IPs or hostnames of ZooKeeper nodes.'), 

60 cfg.ListOpt('maprfs_cldb_ip', 

61 help='The list of IPs or hostnames of CLDB nodes.'), 

62 cfg.BoolOpt('maprfs_rename_managed_volume', 

63 default=True, 

64 help='Specify whether existing volume should be renamed when' 

65 ' start managing.'), 

66] 

67 

68CONF = cfg.CONF 

69CONF.register_opts(maprfs_native_share_opts) 

70 

71 

72class MapRFSNativeShareDriver(driver.ExecuteMixin, driver.ShareDriver): 

73 """MapR-FS Share Driver. 

74 

75 Executes commands relating to shares. 

76 driver_handles_share_servers must be False because this driver does not 

77 support creating or managing virtual storage servers (share servers) 

78 API version history: 

79 

80 1.0 - Initial Version 

81 """ 

82 

83 def __init__(self, *args, **kwargs): 

84 super(MapRFSNativeShareDriver, self).__init__(False, *args, **kwargs) 

85 self.configuration.append_config_values(maprfs_native_share_opts) 

86 self.backend_name = self.configuration.safe_get( 

87 'share_backend_name') or 'MapR-FS-Native' 

88 self._base_volume_dir = self.configuration.safe_get( 

89 'maprfs_base_volume_dir') or '/' 

90 self._maprfs_util = None 

91 self._maprfs_base_path = "maprfs://" 

92 self.cldb_ip = self.configuration.maprfs_cldb_ip or [] 

93 self.zookeeper_ip = self.configuration.maprfs_zookeeper_ip or [] 

94 self.rename_volume = self.configuration.maprfs_rename_managed_volume 

95 self.api = api.API() 

96 

97 def do_setup(self, context): 

98 """Do initialization while the share driver starts.""" 

99 super(MapRFSNativeShareDriver, self).do_setup(context) 

100 self._maprfs_util = mapru.get_version_handler(self.configuration) 

101 

102 def _share_dir(self, share_name): 

103 return os.path.join(self._base_volume_dir, share_name) 

104 

105 def _volume_name(self, share_name): 

106 return share_name 

107 

108 def _get_share_path(self, share): 

109 return share['export_location'] 

110 

111 def _get_snapshot_path(self, snapshot): 

112 share_dir = snapshot['share_instance']['export_location'].split( 

113 ' ')[0][len(self._maprfs_base_path):] 

114 return os.path.join(share_dir, '.snapshot', 

115 snapshot['provider_location'] or snapshot['name']) 

116 

117 def _get_volume_name(self, context, share): 

118 metadata = self.api.get_share_metadata(context, 

119 {'id': share['share_id']}) 

120 return metadata.get('_name', self._volume_name(share['name'])) 

121 

122 def _get_share_export_locations(self, share, path=None): 

123 """Return share path on storage provider.""" 

124 cluster_name = self._maprfs_util.get_cluster_name() 

125 path = '%(path)s -C %(cldb)s -Z %(zookeeper)s -N %(name)s' % { 

126 'path': self._maprfs_base_path + ( 

127 path or self._share_dir(share['name'])), 

128 'cldb': ' '.join(self.cldb_ip), 

129 'zookeeper': ' '.join(self.zookeeper_ip), 

130 'name': cluster_name 

131 } 

132 export_list = [{ 

133 "path": path, 

134 "is_admin_only": False, 

135 "metadata": { 

136 "cldb": ','.join(self.cldb_ip), 

137 "zookeeper": ','.join(self.zookeeper_ip), 

138 "cluster-name": cluster_name, 

139 }, 

140 }] 

141 

142 return export_list 

143 

144 def _create_share(self, share, metadata, context): 

145 """Creates a share.""" 

146 if share['share_proto'].lower() != 'maprfs': 

147 msg = _('Only MapRFS protocol supported!') 

148 LOG.error(msg) 

149 raise exception.MapRFSException(msg=msg) 

150 options = {k[1:]: v for k, v in metadata.items() if k[0] == '_'} 

151 share_dir = options.pop('path', self._share_dir(share['name'])) 

152 volume_name = options.pop('name', self._volume_name(share['name'])) 

153 try: 

154 self._maprfs_util.create_volume(volume_name, share_dir, 

155 share['size'], 

156 **options) 

157 # posix permissions should be 777, ACEs are used as a restriction 

158 self._maprfs_util.maprfs_chmod(share_dir, '777') 

159 except exception.ProcessExecutionError: 

160 self.api.update_share_metadata(context, 

161 {'id': share['share_id']}, 

162 {'_name': 'error'}) 

163 msg = (_('Failed to create volume in MapR-FS for the ' 

164 'share %(share_name)s.') % {'share_name': share['name']}) 

165 LOG.exception(msg) 

166 raise exception.MapRFSException(msg=msg) 

167 

168 def _set_share_size(self, share, size): 

169 volume_name = self._get_volume_name(context.get_admin_context(), share) 

170 try: 

171 if share['size'] > size: 

172 info = self._maprfs_util.get_volume_info(volume_name) 

173 used = info['totalused'] 

174 if int(used) >= int(size) * units.Ki: 174 ↛ 177line 174 didn't jump to line 177 because the condition on line 174 was always true

175 raise exception.ShareShrinkingPossibleDataLoss( 

176 share_id=share['id']) 

177 self._maprfs_util.set_volume_size(volume_name, size) 

178 except exception.ProcessExecutionError: 

179 msg = (_('Failed to set space quota for the share %(share_name)s.') 

180 % {'share_name': share['name']}) 

181 LOG.exception(msg) 

182 raise exception.MapRFSException(msg=msg) 

183 

184 def get_network_allocations_number(self): 

185 return 0 

186 

187 def create_share(self, context, share, share_server=None): 

188 """Create a MapRFS volume which acts as a share.""" 

189 metadata = self.api.get_share_metadata(context, 

190 {'id': share['share_id']}) 

191 self._create_share(share, metadata, context) 

192 return self._get_share_export_locations(share, 

193 path=metadata.get('_path')) 

194 

195 def ensure_share(self, context, share, share_server=None): 

196 """Updates export location if it is changes.""" 

197 volume_name = self._get_volume_name(context, share) 

198 if self._maprfs_util.volume_exists(volume_name): 198 ↛ 207line 198 didn't jump to line 207 because the condition on line 198 was always true

199 info = self._maprfs_util.get_volume_info(volume_name) 

200 path = info['mountdir'] 

201 old_location = share['export_locations'][0] 

202 new_location = self._get_share_export_locations( 

203 share, path=path) 

204 if new_location[0]['path'] != old_location['path']: 204 ↛ exitline 204 didn't return from function 'ensure_share' because the condition on line 204 was always true

205 return new_location 

206 else: 

207 raise exception.ShareResourceNotFound(share_id=share['share_id']) 

208 

209 def create_share_from_snapshot(self, context, share, snapshot, 

210 share_server=None, parent_share=None): 

211 """Creates a share from snapshot.""" 

212 metadata = self.api.get_share_metadata(context, 

213 {'id': share['share_id']}) 

214 sn_share_tenant = self.api.get_share_metadata(context, { 

215 'id': snapshot['share_instance']['share_id']}).get('_tenantuser') 

216 if sn_share_tenant and sn_share_tenant != metadata.get('_tenantuser'): 

217 msg = ( 

218 _('Cannot create share from snapshot %(snapshot_name)s ' 

219 'with name %(share_name)s. Error: Tenant user should not ' 

220 'differ from tenant of the source snapshot.') % 

221 {'snapshot_name': snapshot['name'], 

222 'share_name': share['name']}) 

223 LOG.error(msg) 

224 raise exception.MapRFSException(msg=msg) 

225 share_dir = metadata.get('_path', self._share_dir(share['name'])) 

226 snapshot_path = self._get_snapshot_path(snapshot) 

227 self._create_share(share, metadata, context) 

228 

229 try: 

230 if self._maprfs_util.dir_not_empty(snapshot_path): 230 ↛ 240line 230 didn't jump to line 240 because the condition on line 230 was always true

231 self._maprfs_util.maprfs_cp(snapshot_path + '/*', share_dir) 

232 except exception.ProcessExecutionError: 

233 msg = ( 

234 _('Failed to create share from snapshot %(snapshot_name)s ' 

235 'with name %(share_name)s.') % { 

236 'snapshot_name': snapshot['name'], 

237 'share_name': share['name']}) 

238 LOG.exception(msg) 

239 raise exception.MapRFSException(msg=msg) 

240 return self._get_share_export_locations(share, 

241 path=metadata.get('_path')) 

242 

243 def create_snapshot(self, context, snapshot, share_server=None): 

244 """Creates a snapshot.""" 

245 volume_name = self._get_volume_name(context, snapshot['share']) 

246 snapshot_name = snapshot['name'] 

247 try: 

248 self._maprfs_util.create_snapshot(snapshot_name, volume_name) 

249 return {'provider_location': snapshot_name} 

250 except exception.ProcessExecutionError: 

251 msg = ( 

252 _('Failed to create snapshot %(snapshot_name)s for the share ' 

253 '%(share_name)s.') % {'snapshot_name': snapshot_name, 

254 'share_name': snapshot['share_name']}) 

255 LOG.exception(msg) 

256 raise exception.MapRFSException(msg=msg) 

257 

258 def delete_share(self, context, share, share_server=None): 

259 """Deletes share storage.""" 

260 volume_name = self._get_volume_name(context, share) 

261 if volume_name == "error": 

262 LOG.info("Skipping deleting share with name %s, as it does not" 

263 " exist on the backend", share['name']) 

264 return 

265 try: 

266 self._maprfs_util.delete_volume(volume_name) 

267 except exception.ProcessExecutionError: 

268 msg = (_('Failed to delete share %(share_name)s.') % 

269 {'share_name': share['name']}) 

270 LOG.exception(msg) 

271 raise exception.MapRFSException(msg=msg) 

272 

273 def delete_snapshot(self, context, snapshot, share_server=None): 

274 """Deletes a snapshot.""" 

275 snapshot_name = snapshot['provider_location'] or snapshot['name'] 

276 volume_name = self._get_volume_name(context, snapshot['share']) 

277 try: 

278 self._maprfs_util.delete_snapshot(snapshot_name, volume_name) 

279 except exception.ProcessExecutionError: 

280 msg = (_('Failed to delete snapshot %(snapshot_name)s.') % 

281 {'snapshot_name': snapshot['name']}) 

282 LOG.exception(msg) 

283 raise exception.MapRFSException(msg=msg) 

284 

285 def update_access(self, context, share, access_rules, add_rules, 

286 delete_rules, update_rules, share_server=None): 

287 """Update access rules for given share.""" 

288 for access in access_rules: 

289 if access['access_type'].lower() != 'user': 

290 msg = _("Only 'user' access type allowed!") 

291 LOG.error(msg) 

292 raise exception.InvalidShareAccess(reason=msg) 

293 volume_name = self._get_volume_name(context, share) 

294 try: 

295 # 'update_access' is called before share is removed, so this 

296 # method shouldn`t raise exception if share does 

297 # not exist actually 

298 if not self._maprfs_util.volume_exists(volume_name): 

299 LOG.warning('Can not get share %s.', share['name']) 

300 return 

301 # check update 

302 if add_rules or delete_rules: 

303 self._maprfs_util.remove_volume_ace_rules(volume_name, 

304 delete_rules) 

305 self._maprfs_util.add_volume_ace_rules(volume_name, add_rules) 

306 else: 

307 self._maprfs_util.set_volume_ace(volume_name, access_rules) 

308 except exception.ProcessExecutionError: 

309 msg = (_('Failed to update access for share %(name)s.') % 

310 {'name': share['name']}) 

311 LOG.exception(msg) 

312 raise exception.MapRFSException(msg=msg) 

313 

314 def extend_share(self, share, new_size, share_server=None): 

315 """Extend share storage.""" 

316 self._set_share_size(share, new_size) 

317 

318 def shrink_share(self, share, new_size, share_server=None): 

319 """Shrink share storage.""" 

320 self._set_share_size(share, new_size) 

321 

322 def _check_maprfs_state(self): 

323 try: 

324 return self._maprfs_util.check_state() 

325 except exception.ProcessExecutionError: 

326 msg = _('Failed to check MapRFS state.') 

327 LOG.exception(msg) 

328 raise exception.MapRFSException(msg=msg) 

329 

330 def check_for_setup_error(self): 

331 """Return an error if the prerequisites are not met.""" 

332 if not self.configuration.maprfs_clinode_ip: 

333 msg = _( 

334 'MapR cluster has not been specified in the configuration. ' 

335 'Add the ip or list of ip of nodes with mapr-core installed ' 

336 'in the "maprfs_clinode_ip" configuration parameter.') 

337 LOG.error(msg) 

338 raise exception.MapRFSException(msg=msg) 

339 

340 if not self.configuration.maprfs_cldb_ip: 340 ↛ 343line 340 didn't jump to line 343 because the condition on line 340 was always true

341 LOG.warning('CLDB nodes are not specified!') 

342 

343 if not self.configuration.maprfs_zookeeper_ip: 343 ↛ 346line 343 didn't jump to line 346 because the condition on line 343 was always true

344 LOG.warning('Zookeeper nodes are not specified!') 

345 

346 if not self._check_maprfs_state(): 

347 msg = _('MapR-FS is not in healthy state.') 

348 LOG.error(msg) 

349 raise exception.MapRFSException(msg=msg) 

350 try: 

351 self._maprfs_util.maprfs_ls( 

352 os.path.join(self._base_volume_dir, '')) 

353 except exception.ProcessExecutionError: 

354 msg = _('Invalid "maprfs_base_volume_name". No such directory.') 

355 LOG.exception(msg) 

356 raise exception.MapRFSException(msg=msg) 

357 

358 def manage_existing(self, share, driver_options): 

359 try: 

360 # retrieve share path from export location, maprfs:// prefix and 

361 # metadata (-C -Z -N) should be casted away 

362 share_path = share['export_location'].split( 

363 )[0][len(self._maprfs_base_path):] 

364 info = self._maprfs_util.get_volume_info_by_path( 

365 share_path, check_if_exists=True) 

366 if not info: 

367 msg = _("Share %s not found") % share[ 

368 'export_location'] 

369 LOG.error(msg) 

370 raise exception.ManageInvalidShare(reason=msg) 

371 size = math.ceil(float(info['quota']) / units.Ki) 

372 used = math.ceil(float(info['totalused']) / units.Ki) 

373 volume_name = info['volumename'] 

374 should_rename = self.rename_volume 

375 rename_option = driver_options.get('rename') 

376 if rename_option: 

377 should_rename = strutils.bool_from_string(rename_option) 

378 if should_rename: 

379 self._maprfs_util.rename_volume(volume_name, share['name']) 

380 else: 

381 self.api.update_share_metadata(context.get_admin_context(), 

382 {'id': share['share_id']}, 

383 {'_name': volume_name}) 

384 location = self._get_share_export_locations(share, path=share_path) 

385 if size == 0: 

386 size = used 

387 msg = ( 

388 'Share %s has no size quota. Total used value will be' 

389 ' used as share size') 

390 LOG.warning(msg, share['name']) 

391 return {'size': size, 'export_locations': location} 

392 except (ValueError, KeyError, exception.ProcessExecutionError): 

393 msg = _('Failed to manage share.') 

394 LOG.exception(msg) 

395 raise exception.MapRFSException(msg=msg) 

396 

397 def manage_existing_snapshot(self, snapshot, driver_options): 

398 volume_name = self._get_volume_name(context.get_admin_context(), 

399 snapshot['share']) 

400 snapshot_path = self._get_snapshot_path(snapshot) 

401 try: 

402 snapshot_list = self._maprfs_util.get_snapshot_list( 

403 volume_name=volume_name) 

404 snapshot_name = snapshot['provider_location'] 

405 if snapshot_name not in snapshot_list: 

406 msg = _("Snapshot %s not found") % snapshot_name 

407 LOG.error(msg) 

408 raise exception.ManageInvalidShareSnapshot(reason=msg) 

409 size = math.ceil(float(self._maprfs_util.maprfs_du( 

410 snapshot_path)) / units.Gi) 

411 return {'size': size} 

412 except exception.ProcessExecutionError: 

413 msg = _("Manage existing share snapshot failed.") 

414 LOG.exception(msg) 

415 raise exception.MapRFSException(msg=msg) 

416 

417 def _update_share_stats(self): 

418 """Retrieves stats info of share directories group.""" 

419 try: 

420 total, free = self._maprfs_util.fs_capacity() 

421 except exception.ProcessExecutionError: 

422 msg = _('Failed to check MapRFS capacity info.') 

423 LOG.exception(msg) 

424 raise exception.MapRFSException(msg=msg) 

425 total_capacity_gb = int(math.ceil(float(total) / units.Gi)) 

426 free_capacity_gb = int(math.floor(float(free) / units.Gi)) 

427 data = { 

428 'share_backend_name': self.backend_name, 

429 'storage_protocol': 'MAPRFS', 

430 'driver_handles_share_servers': self.driver_handles_share_servers, 

431 'vendor_name': 'MapR Technologies', 

432 'driver_version': '1.0', 

433 'total_capacity_gb': total_capacity_gb, 

434 'free_capacity_gb': free_capacity_gb, 

435 'snapshot_support': True, 

436 'create_share_from_snapshot_support': True, 

437 } 

438 

439 super(MapRFSNativeShareDriver, self)._update_share_stats(data)