Coverage for manila/share/drivers/cephfs/driver.py: 79%

738 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:39 +0000

1# Copyright (c) 2016 Red Hat, Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16 

17import ipaddress 

18import json 

19import math 

20import re 

21import socket 

22import sys 

23 

24from oslo_config import cfg 

25from oslo_config import types 

26from oslo_log import log 

27from oslo_utils import importutils 

28from oslo_utils import timeutils 

29from oslo_utils import units 

30 

31from manila.common import constants 

32from manila import exception 

33from manila.i18n import _ 

34from manila.message import api as message_api 

35from manila.message import message_field 

36from manila.share import driver 

37from manila.share.drivers import ganesha 

38from manila.share.drivers.ganesha import utils as ganesha_utils 

39from manila.share.drivers import helpers as driver_helpers 

40 

41rados = None 

42json_command = None 

43ceph_default_target = None 

44 

45 

46def setup_rados(): 

47 global rados 

48 if not rados: 

49 try: 

50 rados = importutils.import_module('rados') 

51 except ImportError: 

52 raise exception.ShareBackendException( 

53 _("rados python module is not installed")) 

54 

55 

56def setup_json_command(): 

57 global json_command 

58 if not json_command: 

59 try: 

60 json_command = importutils.import_class( 

61 'ceph_argparse.json_command') 

62 except ImportError: 

63 raise exception.ShareBackendException( 

64 _("ceph_argparse python module is not installed")) 

65 

66 

67CEPHX_ACCESS_TYPE = "cephx" 

68 

69# The default Ceph administrative identity 

70CEPH_DEFAULT_AUTH_ID = "admin" 

71 

72DEFAULT_VOLUME_MODE = '755' 

73 

74RADOS_TIMEOUT = 10 

75 

76LOG = log.getLogger(__name__) 

77 

78# Clone statuses 

79CLONE_CREATING = 'creating' 

80CLONE_FAILED = 'failed' 

81CLONE_CANCELED = 'canceled' 

82CLONE_PENDING = 'pending' 

83CLONE_INPROGRESS = 'in-progress' 

84CLONE_COMPLETE = 'complete' 

85 

86cephfs_opts = [ 

87 cfg.StrOpt('cephfs_conf_path', 

88 default="", 

89 help="Fully qualified path to the ceph.conf file."), 

90 cfg.StrOpt('cephfs_cluster_name', 

91 help="The name of the cluster in use, if it is not " 

92 "the default ('ceph')." 

93 ), 

94 cfg.StrOpt('cephfs_auth_id', 

95 default="manila", 

96 help="The name of the ceph auth identity to use." 

97 ), 

98 cfg.StrOpt('cephfs_volume_path_prefix', 

99 deprecated_for_removal=True, 

100 deprecated_since='Wallaby', 

101 deprecated_reason='This option is not used starting with ' 

102 'the Nautilus release of Ceph.', 

103 default="/volumes", 

104 help="The prefix of the cephfs volume path." 

105 ), 

106 cfg.StrOpt('cephfs_protocol_helper_type', 

107 default="CEPHFS", 

108 choices=['CEPHFS', 'NFS'], 

109 ignore_case=True, 

110 help="The type of protocol helper to use. Default is " 

111 "CEPHFS." 

112 ), 

113 cfg.BoolOpt('cephfs_ganesha_server_is_remote', 

114 default=False, 

115 help="Whether the NFS-Ganesha server is remote to the driver.", 

116 deprecated_for_removal=True, 

117 deprecated_since='2025.1', 

118 deprecated_reason="This option is used by the deprecated " 

119 "NFSProtocolHelper"), 

120 cfg.HostAddressOpt('cephfs_ganesha_server_ip', 

121 help="The IP address of the NFS-Ganesha server."), 

122 cfg.StrOpt('cephfs_ganesha_server_username', 

123 default='root', 

124 help="The username to authenticate as in the remote " 

125 "NFS-Ganesha server host.", 

126 deprecated_for_removal=True, 

127 deprecated_since='2025.1', 

128 deprecated_reason="This option is used by the deprecated " 

129 "NFSProtocolHelper"), 

130 cfg.StrOpt('cephfs_ganesha_path_to_private_key', 

131 help="The path of the driver host's private SSH key file.", 

132 deprecated_for_removal=True, 

133 deprecated_since='2025.1', 

134 deprecated_reason="This option is used by the deprecated " 

135 "NFSProtocolHelper"), 

136 cfg.StrOpt('cephfs_ganesha_server_password', 

137 secret=True, 

138 help="The password to authenticate as the user in the remote " 

139 "Ganesha server host. This is not required if " 

140 "'cephfs_ganesha_path_to_private_key' is configured.", 

141 deprecated_for_removal=True, 

142 deprecated_since='2025.1', 

143 deprecated_reason="This option is used by the deprecated " 

144 "NFSProtocolHelper"), 

145 cfg.ListOpt('cephfs_ganesha_export_ips', 

146 default=[], 

147 help="List of IPs to export shares. If not supplied, " 

148 "then the value of 'cephfs_ganesha_server_ip' " 

149 "will be used to construct share export locations."), 

150 cfg.StrOpt('cephfs_volume_mode', 

151 default=DEFAULT_VOLUME_MODE, 

152 help="The read/write/execute permissions mode for CephFS " 

153 "volumes, snapshots, and snapshot groups expressed in " 

154 "Octal as with linux 'chmod' or 'umask' commands."), 

155 cfg.StrOpt('cephfs_filesystem_name', 

156 help="The name of the filesystem to use, if there are " 

157 "multiple filesystems in the cluster."), 

158 cfg.StrOpt('cephfs_ensure_all_shares_salt', 

159 default="manila_cephfs_reef_caracal", 

160 help="Provide a unique string value to make the driver " 

161 "ensure all of the shares it has created during " 

162 "startup. Ensuring would re-export shares and this " 

163 "action isn't always required, unless something has " 

164 "been administratively modified on CephFS."), 

165 cfg.IntOpt('cephfs_cached_allocated_capacity_update_interval', 

166 min=0, 

167 default=60, 

168 help="The maximum time in seconds that the cached pool " 

169 "data will be considered updated. If it is expired when " 

170 "trying to read the pool data, it must be refreshed.") 

171] 

172 

173cephfsnfs_opts = [ 

174 cfg.StrOpt('cephfs_nfs_cluster_id', 

175 help="The ID of the NFS cluster to use."), 

176] 

177 

178 

179CONF = cfg.CONF 

180CONF.register_opts(cephfs_opts) 

181CONF.register_opts(cephfsnfs_opts) 

182 

183 

184class RadosError(Exception): 

185 """Something went wrong talking to Ceph with librados""" 

186 

187 pass 

188 

189 

190class AllocationCapacityCache(object): 

191 """AllocationCapacityCache for CephFS filesystems. 

192 

193 The cache validity is measured by a stop watch that is 

194 not thread-safe. 

195 """ 

196 

197 def __init__(self, duration): 

198 self._stop_watch = timeutils.StopWatch(duration) 

199 self._cached_allocated_capacity = None 

200 

201 def is_expired(self): 

202 return not self._stop_watch.has_started() or self._stop_watch.expired() 

203 

204 def get_data(self): 

205 return self._cached_allocated_capacity 

206 

207 def update_data(self, cached_allocated_capacity): 

208 if not self._stop_watch.has_started(): 208 ↛ 211line 208 didn't jump to line 211 because the condition on line 208 was always true

209 self._stop_watch.start() 

210 else: 

211 self._stop_watch.restart() 

212 

213 self._cached_allocated_capacity = cached_allocated_capacity 

214 

215 

216def rados_command(rados_client, prefix=None, args=None, 

217 json_obj=False, target=None, inbuf=None): 

218 """Safer wrapper for ceph_argparse.json_command 

219 

220 Raises error exception instead of relying on caller to check return 

221 codes. 

222 

223 Error exception can result from: 

224 * Timeout 

225 * Actual legitimate errors 

226 * Malformed JSON output 

227 

228 return: If json_obj is True, return the decoded JSON object from ceph, 

229 or None if empty string returned. 

230 If json is False, return a decoded string (the data returned by 

231 ceph command) 

232 """ 

233 

234 target = target or ceph_default_target 

235 

236 if args is None: 

237 args = {} 

238 

239 argdict = args.copy() 

240 argdict['format'] = 'json' 

241 

242 if inbuf is None: 

243 inbuf = b'' 

244 

245 LOG.debug("Invoking ceph_argparse.json_command - rados_client=%(cl)s, " 

246 "target=%(tg)s, prefix='%(pf)s', argdict=%(ad)s, inbuf=%(ib)s, " 

247 "timeout=%(to)s.", 

248 {"cl": rados_client, "tg": target, "pf": prefix, "ad": argdict, 

249 "ib": inbuf, "to": RADOS_TIMEOUT}) 

250 

251 try: 

252 ret, outbuf, outs = json_command(rados_client, 

253 target=target, 

254 prefix=prefix, 

255 argdict=argdict, 

256 inbuf=inbuf, 

257 timeout=RADOS_TIMEOUT) 

258 if ret != 0: 

259 raise rados.Error(outs, ret) 

260 if not json_obj: 

261 result = outbuf.decode().strip() 

262 else: 

263 if outbuf: 

264 result = json.loads(outbuf.decode().strip()) 

265 else: 

266 result = None 

267 except Exception as e: 

268 msg = _("json_command failed - prefix=%(pfx)s, argdict=%(ad)s - " 

269 "exception message: %(ex)s." % 

270 {"pfx": prefix, "ad": argdict, "ex": e}) 

271 raise exception.ShareBackendException(msg) 

272 

273 return result 

274 

275 

276class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin, 

277 driver.ShareDriver): 

278 """Driver for the Ceph Filesystem.""" 

279 

280 def __init__(self, *args, **kwargs): 

281 super(CephFSDriver, self).__init__(False, *args, **kwargs) 

282 self.backend_name = self.configuration.safe_get( 

283 'share_backend_name') or 'CephFS' 

284 

285 setup_rados() 

286 setup_json_command() 

287 self._rados_client = None 

288 # name of the filesystem/volume used by the driver 

289 self._volname = None 

290 self._ceph_mon_version = None 

291 self.configuration.append_config_values(cephfs_opts) 

292 self.configuration.append_config_values(cephfsnfs_opts) 

293 self._cached_allocated_capacity_gb = None 

294 self.private_storage = kwargs.get('private_storage') 

295 

296 try: 

297 int(self.configuration.cephfs_volume_mode, 8) 

298 except ValueError: 

299 msg = _("Invalid CephFS volume mode %s") 

300 raise exception.BadConfigurationException( 

301 msg % self.configuration.cephfs_volume_mode) 

302 

303 self._cephfs_volume_mode = self.configuration.cephfs_volume_mode 

304 self.ipv6_implemented = True 

305 

306 def do_setup(self, context): 

307 if self.configuration.cephfs_protocol_helper_type.upper() == "CEPHFS": 

308 protocol_helper_class = getattr( 

309 sys.modules[__name__], 'NativeProtocolHelper') 

310 else: 

311 # FIXME(vkmc) we intent to replace NFSProtocolHelper 

312 # with NFSClusterProtocolHelper helper in BB/CC release 

313 if self.configuration.cephfs_nfs_cluster_id is None: 

314 protocol_helper_class = getattr( 

315 sys.modules[__name__], 'NFSProtocolHelper') 

316 else: 

317 protocol_helper_class = getattr( 

318 sys.modules[__name__], 'NFSClusterProtocolHelper') 

319 

320 self.setup_default_ceph_cmd_target() 

321 

322 self.protocol_helper = protocol_helper_class( 

323 self._execute, 

324 self.configuration, 

325 rados_client=self.rados_client, 

326 volname=self.volname) 

327 

328 self.protocol_helper.init_helper() 

329 allocation_capacity_gb = self._get_cephfs_filesystem_allocation() 

330 self._cached_allocated_capacity_gb = AllocationCapacityCache( 

331 self.configuration.cephfs_cached_allocated_capacity_update_interval 

332 ) 

333 self._cached_allocated_capacity_gb.update_data(allocation_capacity_gb) 

334 

335 def check_for_setup_error(self): 

336 """Returns an error if prerequisites aren't met.""" 

337 self.protocol_helper.check_for_setup_error() 

338 

339 def _get_cephfs_filesystem_allocation(self): 

340 allocated_capacity_gb = 0 

341 argdict = {"vol_name": self.volname} 

342 subvolumes = rados_command( 

343 self.rados_client, "fs subvolume ls", argdict, json_obj=True) 

344 for sub_vol in subvolumes: 

345 argdict = {"vol_name": self.volname, "sub_name": sub_vol["name"]} 

346 sub_info = rados_command( 

347 self.rados_client, "fs subvolume info", argdict, json_obj=True) 

348 size = sub_info.get('bytes_quota', 0) 

349 if size == "infinite": 

350 # If we have a share that has infinite quota, we should not 

351 # add that to the allocated capacity as that would make the 

352 # scheduler think this backend is full. 

353 continue 

354 allocated_capacity_gb += round(int(size) / units.Gi, 2) 

355 return allocated_capacity_gb 

356 

357 def _update_share_stats(self): 

358 stats = self.rados_client.get_cluster_stats() 

359 

360 total_capacity_gb = round(stats['kb'] / units.Mi, 2) 

361 free_capacity_gb = round(stats['kb_avail'] / units.Mi, 2) 

362 if self._cached_allocated_capacity_gb.is_expired(): 

363 allocated_capacity_gb = self._get_cephfs_filesystem_allocation() 

364 self._cached_allocated_capacity_gb.update_data( 

365 allocated_capacity_gb 

366 ) 

367 else: 

368 allocated_capacity_gb = ( 

369 self._cached_allocated_capacity_gb.get_data() 

370 ) 

371 

372 data = { 

373 'vendor_name': 'Ceph', 

374 'driver_version': '1.0', 

375 'share_backend_name': self.backend_name, 

376 'storage_protocol': self.configuration.safe_get( 

377 'cephfs_protocol_helper_type'), 

378 'pools': [ 

379 { 

380 'pool_name': 'cephfs', 

381 'total_capacity_gb': total_capacity_gb, 

382 'free_capacity_gb': free_capacity_gb, 

383 'allocated_capacity_gb': allocated_capacity_gb, 

384 'qos': 'False', 

385 'reserved_percentage': self.configuration.safe_get( 

386 'reserved_share_percentage'), 

387 'reserved_snapshot_percentage': 

388 self.configuration.safe_get( 

389 'reserved_share_from_snapshot_percentage') or 

390 self.configuration.safe_get( 

391 'reserved_share_percentage'), 

392 'reserved_share_extend_percentage': 

393 self.configuration.safe_get( 

394 'reserved_share_extend_percentage') or 

395 self.configuration.safe_get( 

396 'reserved_share_percentage'), 

397 'dedupe': [False], 

398 'compression': [False], 

399 'thin_provisioning': [True] 

400 } 

401 ], 

402 'total_capacity_gb': total_capacity_gb, 

403 'free_capacity_gb': free_capacity_gb, 

404 'allocated_capacity_gb': allocated_capacity_gb, 

405 'snapshot_support': True, 

406 'create_share_from_snapshot_support': True, 

407 } 

408 super( # pylint: disable=no-member 

409 CephFSDriver, self)._update_share_stats(data) 

410 

411 def _to_bytes(self, gigs): 

412 """Convert a Manila size into bytes. 

413 

414 Manila uses gibibytes everywhere. 

415 

416 :param gigs: integer number of gibibytes. 

417 :return: integer number of bytes. 

418 """ 

419 return gigs * units.Gi 

420 

421 def _get_subvolume_name(self, share_id): 

422 try: 

423 subvolume_name = self.private_storage.get( 

424 share_id, "subvolume_name") 

425 except Exception: 

426 return share_id 

427 # Subvolume name could be None, so in case it is, return share_id 

428 return subvolume_name or share_id 

429 

430 def _get_subvolume_snapshot_name(self, snapshot_id): 

431 try: 

432 subvolume_snapshot_name = self.private_storage.get( 

433 snapshot_id, "subvolume_snapshot_name" 

434 ) 

435 except Exception: 

436 return snapshot_id 

437 return subvolume_snapshot_name or snapshot_id 

438 

439 def _get_export_locations(self, share, subvolume_name=None): 

440 """Get the export location for a share. 

441 

442 :param share: a manila share. 

443 :return: the export location for a share. 

444 """ 

445 

446 subvolume_name = subvolume_name or share["id"] 

447 # get path of FS subvolume/share 

448 argdict = { 

449 "vol_name": self.volname, 

450 "sub_name": subvolume_name 

451 } 

452 if share['share_group_id'] is not None: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true

453 argdict.update({"group_name": share["share_group_id"]}) 

454 

455 subvolume_path = rados_command( 

456 self.rados_client, "fs subvolume getpath", argdict) 

457 

458 return self.protocol_helper.get_export_locations(share, subvolume_path) 

459 

460 def get_optional_share_creation_data(self, share, share_server=None): 

461 """Get the additional properties to be set in a share. 

462 

463 :return: the metadata to be set in share. 

464 """ 

465 

466 return self.protocol_helper.get_optional_share_creation_data(share) 

467 

468 def setup_default_ceph_cmd_target(self): 

469 global ceph_default_target 

470 if not ceph_default_target: 

471 ceph_default_target = ('mon-mgr', ) 

472 

473 try: 

474 ceph_major_version = self.ceph_mon_version['major'] 

475 except Exception: 

476 msg = _("Error reading ceph version to set the default " 

477 "target. Please check your Ceph backend is reachable.") 

478 raise exception.ShareBackendException(msg=msg) 

479 

480 if ceph_major_version == '14': 

481 ceph_default_target = ('mgr', ) 

482 elif ceph_major_version < '14': 

483 msg = _("CephFSDriver does not support Ceph " 

484 "cluster version less than 14.x (Nautilus)") 

485 raise exception.ShareBackendException(msg=msg) 

486 

487 @property 

488 def ceph_mon_version(self): 

489 if self._ceph_mon_version: 489 ↛ 490line 489 didn't jump to line 490 because the condition on line 489 was never true

490 return self._ceph_mon_version 

491 

492 self._ceph_mon_version = {} 

493 

494 output = rados_command(self.rados_client, "version", target=('mon', )) 

495 

496 version_str = json.loads(output)["version"] 

497 

498 p = re.compile(r"ceph version (\d+)\.(\d+)\.(\d+)") 

499 major, minor, extra = p.match(version_str).groups() 

500 self._ceph_mon_version['major'] = major 

501 self._ceph_mon_version['minor'] = minor 

502 self._ceph_mon_version['extra'] = extra 

503 

504 return self._ceph_mon_version 

505 

506 @property 

507 def rados_client(self): 

508 if self._rados_client: 

509 return self._rados_client 

510 

511 conf_path = self.configuration.safe_get('cephfs_conf_path') 

512 cluster_name = self.configuration.safe_get('cephfs_cluster_name') 

513 auth_id = self.configuration.safe_get('cephfs_auth_id') 

514 self._rados_client = rados.Rados( 

515 name="client.{0}".format(auth_id), 

516 clustername=cluster_name, 

517 conffile=conf_path, 

518 conf={} 

519 ) 

520 

521 LOG.info("[%(be)s] Ceph client found, connecting...", 

522 {"be": self.backend_name}) 

523 try: 

524 if self._rados_client.state != "connected": 524 ↛ 532line 524 didn't jump to line 532 because the condition on line 524 was always true

525 self._rados_client.connect() 

526 except Exception: 

527 self._rados_client = None 

528 raise exception.ShareBackendException( 

529 "[%(be)s] Ceph client failed to connect.", 

530 {"be": self.backend_name}) 

531 else: 

532 LOG.info("[%(be)s] Ceph client connection complete.", 

533 {"be": self.backend_name}) 

534 

535 return self._rados_client 

536 

537 @property 

538 def volname(self): 

539 # Name of the CephFS volume/filesystem where the driver creates 

540 # manila entities such as shares, sharegroups, snapshots, etc. 

541 if self._volname: 

542 return self._volname 

543 

544 self._volname = self.configuration.safe_get('cephfs_filesystem_name') 

545 if not self._volname: 

546 out = rados_command( 

547 self.rados_client, "fs volume ls", json_obj=True) 

548 if len(out) == 1: 

549 self._volname = out[0]['name'] 

550 else: 

551 if len(out) > 1: 

552 msg = _("Specify Ceph filesystem name using " 

553 "'cephfs_filesystem_name' driver option.") 

554 else: 

555 msg = _("No Ceph filesystem found.") 

556 raise exception.ShareBackendException(msg=msg) 

557 

558 return self._volname 

559 

560 def create_share(self, context, share, share_server=None): 

561 """Create a CephFS volume. 

562 

563 :param context: A RequestContext. 

564 :param share: A Share. 

565 :param share_server: Always None for CephFS native. 

566 :return: The export locations dictionary. 

567 """ 

568 requested_proto = share['share_proto'].upper() 

569 supported_proto = ( 

570 self.configuration.cephfs_protocol_helper_type.upper()) 

571 if (requested_proto != supported_proto): 

572 msg = _("Share protocol %s is not supported.") % requested_proto 

573 raise exception.ShareBackendException(msg=msg) 

574 size = self._to_bytes(share['size']) 

575 

576 LOG.debug("[%(be)s]: create_share: id=%(id)s, size=%(sz)s, " 

577 "group=%(gr)s.", 

578 {"be": self.backend_name, "id": share['id'], 

579 "sz": share['size'], "gr": share['share_group_id']}) 

580 

581 # create FS subvolume/share 

582 argdict = { 

583 "vol_name": self.volname, 

584 "sub_name": share["id"], 

585 "size": size, 

586 "namespace_isolated": True, 

587 "mode": self._cephfs_volume_mode 

588 } 

589 

590 if share['share_group_id'] is not None: 590 ↛ 591line 590 didn't jump to line 591 because the condition on line 590 was never true

591 argdict.update({"group_name": share["share_group_id"]}) 

592 

593 rados_command(self.rados_client, "fs subvolume create", argdict) 

594 

595 return self._get_export_locations(share) 

596 

597 def _get_subvolume_size_in_gb(self, subvolume_size): 

598 """Returns the size of the subvolume in GB.""" 

599 # There is a chance that we would end up with 2.5gb for example, so 

600 # we round it up 

601 return int(math.ceil(int(subvolume_size) / units.Gi)) 

602 

603 def manage_existing(self, share, driver_options): 

604 # bring FS subvolume/share under manila management 

605 LOG.debug("[%(be)s]: manage_existing: id=%(id)s.", 

606 {"be": self.backend_name, "id": share['id']}) 

607 

608 # Subvolume name must be provided. 

609 subvolume_name = share['export_locations'][0]['path'] 

610 if not subvolume_name: 

611 raise exception.ShareBackendException( 

612 "The subvolume name must be provided as a 'export_path' while " 

613 "managing shares.") 

614 

615 argdict = { 

616 "vol_name": self.volname, 

617 "sub_name": subvolume_name, 

618 } 

619 

620 subvolume_info = {} 

621 # Try to get the subvolume info in the ceph backend 

622 try: 

623 subvolume_info = rados_command( 

624 self.rados_client, "fs subvolume info", argdict, json_obj=True) 

625 except exception.ShareBackendException as e: 

626 # Couldn't find a subvolume with the name provided. 

627 if 'does not exist' in str(e).lower(): 627 ↛ 633line 627 didn't jump to line 633 because the condition on line 627 was always true

628 msg = ("Subvolume %(subvol)s cannot be found on the " 

629 "backend." % {'subvol': subvolume_name}) 

630 raise exception.ShareBackendException(msg=msg) 

631 

632 # Check if share mode matches 

633 if subvolume_info.get('mode') != self._cephfs_volume_mode: 633 ↛ 637line 633 didn't jump to line 637 because the condition on line 633 was always true

634 LOG.info("Subvolume %(subvol)s mode is different from what is " 

635 "configured in Manila.") 

636 

637 subvolume_size = subvolume_info.get('bytes_quota') 

638 

639 # We need to resize infinite subvolumes, as Manila doesn't support it 

640 if isinstance(subvolume_size, str) and subvolume_size == "infinite": 

641 try: 

642 # Default resize gb must be configured 

643 new_size = driver_options.get('size') 

644 if not new_size or new_size <= 0: 

645 msg = ("subvolume %s has infinite size and a valid " 

646 "integer value was not added to the driver_options " 

647 "arg. Please provide a 'size' in the driver " 

648 "options and try again." % subvolume_name) 

649 raise exception.ShareBackendException(msg=msg) 

650 

651 # Attempt resizing the subvolume 

652 self._resize_share(share, new_size, no_shrink=True) 

653 subvolume_size = new_size 

654 except exception.ShareShrinkingPossibleDataLoss: 

655 msg = ("Could not resize the subvolume using the provided " 

656 "size, as data could be lost. Please update it and " 

657 "try again.") 

658 LOG.exception(msg) 

659 raise 

660 except exception.ShareBackendException: 

661 raise 

662 else: 

663 if int(subvolume_size) % units.Gi == 0: 

664 # subvolume_size is an integer GB, no need to resize subvolume 

665 subvolume_size = self._get_subvolume_size_in_gb(subvolume_size) 

666 else: 

667 # subvolume size is not an integer GB. need to resize subvolume 

668 new_size_gb = self._get_subvolume_size_in_gb(subvolume_size) 

669 LOG.info( 

670 "Subvolume %(subvol)s is being resized to %(new_size)s " 

671 "GB.", { 

672 'subvol': subvolume_name, 

673 'new_size': new_size_gb 

674 } 

675 ) 

676 self._resize_share(share, new_size_gb, no_shrink=True) 

677 subvolume_size = new_size_gb 

678 

679 share_metadata = {"subvolume_name": subvolume_name} 

680 self.private_storage.update(share['id'], share_metadata) 

681 

682 export_locations = self._get_export_locations( 

683 share, subvolume_name=subvolume_name 

684 ) 

685 

686 managed_share = { 

687 "size": subvolume_size, 

688 "export_locations": export_locations 

689 } 

690 return managed_share 

691 

692 def manage_existing_snapshot(self, snapshot, driver_options): 

693 # bring FS subvolume/share under manila management 

694 LOG.debug("[%(be)s]: manage_existing_snapshot: id=%(id)s.", 

695 {"be": self.backend_name, "id": snapshot['id']}) 

696 

697 # Subvolume name must be provided. 

698 sub_snapshot_name = snapshot.get('provider_location', None) 

699 if not sub_snapshot_name: 

700 raise exception.ShareBackendException( 

701 "The subvolume snapshot name must be provided as the " 

702 "'provider_location' while managing snapshots.") 

703 

704 sub_name = self._get_subvolume_name(snapshot['share_instance_id']) 

705 

706 argdict = { 

707 "vol_name": self.volname, 

708 "sub_name": sub_name, 

709 } 

710 

711 # Try to get the subvolume info in the ceph backend, this is useful for 

712 # us to get the size for the snapshot. 

713 try: 

714 rados_command( 

715 self.rados_client, "fs subvolume info", argdict, json_obj=True) 

716 except exception.ShareBackendException as e: 

717 # Couldn't find a subvolume with the name provided. 

718 if 'does not exist' in str(e).lower(): 718 ↛ 723line 718 didn't jump to line 723 because the condition on line 718 was always true

719 msg = ("Subvolume %(subvol)s cannot be found on the " 

720 "backend." % {'subvol': sub_name}) 

721 raise exception.ShareBackendException(msg=msg) 

722 

723 sub_snap_info_argdict = { 

724 "vol_name": self.volname, 

725 "sub_name": sub_name, 

726 "snap_name": sub_snapshot_name 

727 } 

728 # Shares/subvolumes already managed by manila will never have 

729 # infinite as their bytes_quota, so no need for extra precaution. 

730 try: 

731 managed_subvolume_snapshot = rados_command( 

732 self.rados_client, "fs subvolume snapshot info", 

733 sub_snap_info_argdict, json_obj=True 

734 ) 

735 except exception.ShareBackendException as e: 

736 # Couldn't find a subvolume snapshot with the name provided. 

737 if 'does not exist' in str(e).lower(): 737 ↛ 742line 737 didn't jump to line 742 because the condition on line 737 was always true

738 msg = ("Subvolume snapshot %(snap)s cannot be found on the " 

739 "backend." % {'snap': sub_snapshot_name}) 

740 raise exception.ShareBackendException(msg=msg) 

741 

742 snapshot_metadata = {"subvolume_snapshot_name": sub_snapshot_name} 

743 self.private_storage.update( 

744 snapshot['snapshot_id'], snapshot_metadata 

745 ) 

746 

747 # NOTE(carloss): fs subvolume snapshot info command does not return 

748 # the snapshot size, so we reuse the share size until this is not 

749 # available for us. 

750 managed_snapshot = {'provider_location': sub_snapshot_name} 

751 if managed_subvolume_snapshot.get('bytes_quota') is not None: 751 ↛ 752line 751 didn't jump to line 752 because the condition on line 751 was never true

752 managed_snapshot['size'] = self._get_subvolume_size_in_gb( 

753 managed_subvolume_snapshot['bytes_quota']) 

754 

755 return managed_snapshot 

756 

757 def _need_to_cancel_clone(self, share, clone_name): 

758 # Is there an ongoing clone operation that needs to be canceled 

759 # so we can delete the share? 

760 need_to_cancel_clone = False 

761 

762 argdict = { 

763 "vol_name": self.volname, 

764 "clone_name": clone_name, 

765 } 

766 if share['share_group_id'] is not None: 766 ↛ 767line 766 didn't jump to line 767 because the condition on line 766 was never true

767 argdict.update({"group_name": share["share_group_id"]}) 

768 

769 try: 

770 status = rados_command( 

771 self.rados_client, "fs clone status", argdict) 

772 if status in (CLONE_PENDING, CLONE_INPROGRESS): 

773 need_to_cancel_clone = True 

774 except exception.ShareBackendException as e: 

775 # Trying to get clone status on a regular subvolume is expected 

776 # to fail. 

777 if 'not allowed on subvolume' not in str(e).lower(): 777 ↛ 778line 777 didn't jump to line 778 because the condition on line 777 was never true

778 raise exception.ShareBackendException( 

779 "Failed to remove share.") 

780 

781 return need_to_cancel_clone 

782 

783 def delete_share(self, context, share, share_server=None): 

784 # remove FS subvolume/share 

785 LOG.debug("[%(be)s]: delete_share: id=%(id)s, group=%(gr)s.", 

786 {"be": self.backend_name, "id": share['id'], 

787 "gr": share['share_group_id']}) 

788 

789 clone_name = self._get_subvolume_name(share['id']) 

790 if self._need_to_cancel_clone(share, clone_name): 

791 try: 

792 argdict = { 

793 "vol_name": self.volname, 

794 "clone_name": clone_name, 

795 "force": True, 

796 } 

797 if share['share_group_id'] is not None: 797 ↛ 798line 797 didn't jump to line 798 because the condition on line 797 was never true

798 argdict.update({"group_name": share["share_group_id"]}) 

799 

800 rados_command(self.rados_client, "fs clone cancel", argdict) 

801 except rados.Error: 

802 raise exception.ShareBackendException( 

803 "Failed to cancel clone operation.") 

804 

805 argdict = { 

806 "vol_name": self.volname, 

807 "sub_name": self._get_subvolume_name(share["id"]), 

808 # We want to clean up the share even if the subvolume is 

809 # not in a good state. 

810 "force": True, 

811 } 

812 if share['share_group_id'] is not None: 812 ↛ 813line 812 didn't jump to line 813 because the condition on line 812 was never true

813 argdict.update({"group_name": share["share_group_id"]}) 

814 

815 rados_command(self.rados_client, "fs subvolume rm", argdict) 

816 

817 def update_access(self, context, share, access_rules, add_rules, 

818 delete_rules, update_rules, share_server=None): 

819 sub_name = self._get_subvolume_name(share['id']) 

820 return self.protocol_helper.update_access( 

821 context, share, access_rules, add_rules, delete_rules, 

822 update_rules, share_server=share_server, sub_name=sub_name) 

823 

824 def get_backend_info(self, context): 

825 return self.protocol_helper.get_backend_info(context) 

826 

827 def ensure_shares(self, context, shares): 

828 share_updates = {} 

829 for share in shares: 

830 share_updates[share['id']] = { 

831 'reapply_access_rules': 

832 self.protocol_helper.reapply_rules_while_ensuring_shares, 

833 } 

834 try: 

835 share_metadata = ( 

836 self.get_optional_share_creation_data(share).get( 

837 "metadata", {}) 

838 ) 

839 share_updates[share['id']].update({ 

840 'export_locations': self._get_export_locations(share), 

841 "metadata": share_metadata 

842 }) 

843 except exception.ShareBackendException as e: 

844 if 'does not exist' in str(e).lower(): 844 ↛ 829line 844 didn't jump to line 829 because the condition on line 844 was always true

845 msg = ("Share instance %(si)s belonging to share " 

846 "%(share)s cannot be found on the backend.") 

847 msg_payload = {'si': share['id'], 

848 'share': share['share_id']} 

849 LOG.exception(msg, msg_payload) 

850 share_updates[share['id']] = { 

851 'status': constants.STATUS_ERROR, 

852 } 

853 return share_updates 

854 

855 def _resize_share(self, share, new_size, no_shrink=False): 

856 argdict = { 

857 "vol_name": self.volname, 

858 "sub_name": self._get_subvolume_name(share["id"]), 

859 "new_size": self._to_bytes(new_size), 

860 } 

861 if share["share_group_id"] is not None: 861 ↛ 862line 861 didn't jump to line 862 because the condition on line 861 was never true

862 argdict.update({"group_name": share["share_group_id"]}) 

863 

864 if no_shrink: 

865 argdict.update({"no_shrink": True}) 

866 

867 try: 

868 rados_command(self.rados_client, "fs subvolume resize", argdict) 

869 except exception.ShareBackendException as e: 

870 if 'would be lesser than' in str(e).lower(): 870 ↛ 873line 870 didn't jump to line 873 because the condition on line 870 was always true

871 raise exception.ShareShrinkingPossibleDataLoss( 

872 share_id=share['id']) 

873 raise 

874 

875 def extend_share(self, share, new_size, share_server=None): 

876 # resize FS subvolume/share 

877 LOG.debug("[%(be)s]: extend_share: share=%(id)s, size=%(sz)s.", 

878 {"be": self.backend_name, "id": share['id'], 

879 "sz": new_size}) 

880 

881 self._resize_share(share, new_size) 

882 

883 def shrink_share(self, share, new_size, share_server=None): 

884 # resize FS subvolume/share 

885 LOG.debug("[%(be)s]: shrink_share: share=%(id)s, size=%(sz)s.", 

886 {"be": self.backend_name, "id": share['id'], 

887 "sz": new_size}) 

888 

889 self._resize_share(share, new_size, no_shrink=True) 

890 

891 def create_snapshot(self, context, snapshot, share_server=None): 

892 # create a FS snapshot 

893 LOG.debug("[%(be)s]: create_snapshot: original share=%(id)s, " 

894 "snapshot=%(sn)s.", 

895 {"be": self.backend_name, "id": snapshot['share_id'], 

896 "sn": snapshot['id']}) 

897 

898 argdict = { 

899 "vol_name": self.volname, 

900 "sub_name": self._get_subvolume_name(snapshot["share_id"]), 

901 "snap_name": snapshot["snapshot_id"], 

902 } 

903 

904 rados_command( 

905 self.rados_client, "fs subvolume snapshot create", argdict) 

906 

907 return {"provider_location": snapshot["snapshot_id"]} 

908 

909 def delete_snapshot(self, context, snapshot, share_server=None): 

910 # delete a FS snapshot 

911 LOG.debug("[%(be)s]: delete_snapshot: snapshot=%(id)s.", 

912 {"be": self.backend_name, "id": snapshot['id']}) 

913 

914 snapshot_name = self._get_subvolume_snapshot_name( 

915 snapshot['snapshot_id'] 

916 ) 

917 # FIXME(vkmc) remove this in CC (next tick) release. 

918 legacy_snap_name = "_".join([snapshot["snapshot_id"], snapshot["id"]]) 

919 

920 argdict_legacy = { 

921 "vol_name": self.volname, 

922 "sub_name": self._get_subvolume_name(snapshot["share_id"]), 

923 "snap_name": legacy_snap_name, 

924 "force": True, 

925 } 

926 

927 # try removing snapshot using legacy naming 

928 rados_command( 

929 self.rados_client, "fs subvolume snapshot rm", argdict_legacy) 

930 

931 # in case it's a snapshot with new naming, retry remove with new name 

932 argdict = argdict_legacy.copy() 

933 argdict.update({"snap_name": snapshot_name}) 

934 rados_command(self.rados_client, "fs subvolume snapshot rm", argdict) 

935 

936 def create_share_group(self, context, sg_dict, share_server=None): 

937 # delete a FS group 

938 LOG.debug("[%(be)s]: create_share_group: share_group=%(id)s.", 

939 {"be": self.backend_name, "id": sg_dict['id']}) 

940 

941 argdict = { 

942 "vol_name": self.volname, 

943 "group_name": sg_dict['id'], 

944 "mode": self._cephfs_volume_mode, 

945 } 

946 

947 rados_command(self.rados_client, "fs subvolumegroup create", argdict) 

948 

949 def delete_share_group(self, context, sg_dict, share_server=None): 

950 # delete a FS group 

951 LOG.debug("[%(be)s]: delete_share_group: share_group=%(id)s.", 

952 {"be": self.backend_name, "id": sg_dict['id']}) 

953 

954 argdict = { 

955 "vol_name": self.volname, 

956 "group_name": sg_dict['id'], 

957 "force": True, 

958 } 

959 

960 rados_command(self.rados_client, "fs subvolumegroup rm", argdict) 

961 

962 def delete_share_group_snapshot(self, context, snap_dict, 

963 share_server=None): 

964 # delete a FS group snapshot 

965 LOG.debug("[%(be)s]: delete_share_group_snapshot: " 

966 "share_group=%(sg_id)s, snapshot=%(sn)s.", 

967 {"be": self.backend_name, "sg_id": snap_dict['id'], 

968 "sn": snap_dict["share_group_id"]}) 

969 

970 argdict = { 

971 "vol_name": self.volname, 

972 "group_name": snap_dict["share_group_id"], 

973 "snap_name": snap_dict["id"], 

974 "force": True, 

975 } 

976 

977 rados_command( 

978 self.rados_client, "fs subvolumegroup snapshot rm", argdict) 

979 

980 return None, [] 

981 

982 def create_share_group_snapshot(self, context, snap_dict, 

983 share_server=None): 

984 # create a FS group snapshot 

985 LOG.debug("[%(be)s]: create_share_group_snapshot: share_group=%(id)s, " 

986 "snapshot=%(sn)s.", 

987 {"be": self.backend_name, "id": snap_dict['share_group_id'], 

988 "sn": snap_dict["id"]}) 

989 

990 msg = _("Share group snapshot feature is no longer supported in " 

991 "mainline CephFS (existing group snapshots can still be " 

992 "listed and deleted).") 

993 raise exception.ShareBackendException(msg=msg) 

994 

995 def _get_clone_status(self, share): 

996 """Check the status of a newly cloned share.""" 

997 clone_name = self._get_subvolume_name(share["id"]) 

998 argdict = { 

999 "vol_name": self.volname, 

1000 "clone_name": clone_name 

1001 } 

1002 if share['share_group_id'] is not None: 1002 ↛ 1003line 1002 didn't jump to line 1003 because the condition on line 1002 was never true

1003 argdict.update({"group_name": share["share_group_id"]}) 

1004 

1005 out = rados_command(self.rados_client, 

1006 "fs clone status", argdict, True) 

1007 return out['status']['state'] 

1008 

1009 def _update_create_from_snapshot_status(self, share): 

1010 updates = { 

1011 'status': constants.STATUS_ERROR, 

1012 'progress': None, 

1013 'export_locations': [] 

1014 } 

1015 status = self._get_clone_status(share) 

1016 if status == CLONE_COMPLETE: 1016 ↛ 1017line 1016 didn't jump to line 1017 because the condition on line 1016 was never true

1017 updates['status'] = constants.STATUS_AVAILABLE 

1018 updates['progress'] = '100%' 

1019 updates['export_locations'] = self._get_export_locations(share) 

1020 elif status in (CLONE_PENDING, CLONE_INPROGRESS): 1020 ↛ 1024line 1020 didn't jump to line 1024 because the condition on line 1020 was always true

1021 updates['status'] = constants.STATUS_CREATING_FROM_SNAPSHOT 

1022 else: 

1023 # error if clone operation is not progressing or completed 

1024 raise exception.ShareBackendException( 

1025 "rados client clone of snapshot [%(sn)s}] to new " 

1026 "share [%(shr)s}] did not complete successfully." % 

1027 {"sn": share["snapshot_id"], "shr": share["id"]}) 

1028 return updates 

1029 

1030 def get_share_status(self, share, share_server=None): 

1031 """Returns the current status for a share. 

1032 

1033 :param share: a manila share. 

1034 :param share_server: a manila share server (not currently supported). 

1035 :returns: manila share status. 

1036 """ 

1037 

1038 if share['status'] != constants.STATUS_CREATING_FROM_SNAPSHOT: 1038 ↛ 1042line 1038 didn't jump to line 1042 because the condition on line 1038 was always true

1039 LOG.warning("Caught an unexpected share status '%s' during share " 

1040 "status update routine. Skipping.", share['status']) 

1041 return 

1042 return self._update_create_from_snapshot_status(share) 

1043 

1044 def create_share_from_snapshot(self, context, share, snapshot, 

1045 share_server=None, parent_share=None): 

1046 """Create a CephFS subvolume from a snapshot""" 

1047 

1048 LOG.debug("[%(be)s]: create_share_from_snapshot: id=%(id)s, " 

1049 "snapshot=%(sn)s, size=%(sz)s, group=%(gr)s.", 

1050 {"be": self.backend_name, "id": share['id'], 

1051 "sn": snapshot['id'], "sz": share['size'], 

1052 "gr": share['share_group_id']}) 

1053 

1054 argdict = { 

1055 "vol_name": self.volname, 

1056 "sub_name": self._get_subvolume_name(parent_share["id"]), 

1057 "snap_name": self._get_subvolume_snapshot_name( 

1058 snapshot["snapshot_id"]), 

1059 "target_sub_name": self._get_subvolume_name(share["id"]) 

1060 } 

1061 if share['share_group_id'] is not None: 1061 ↛ 1062line 1061 didn't jump to line 1062 because the condition on line 1061 was never true

1062 argdict.update({"group_name": share["share_group_id"]}) 

1063 

1064 rados_command( 

1065 self.rados_client, "fs subvolume snapshot clone", argdict) 

1066 

1067 return self._update_create_from_snapshot_status(share) 

1068 

1069 def __del__(self): 

1070 if self._rados_client: 

1071 LOG.info("[%(be)s] Ceph client disconnecting...", 

1072 {"be": self.backend_name}) 

1073 self._rados_client.shutdown() 

1074 self._rados_client = None 

1075 LOG.info("[%(be)s] Ceph client disconnected", 

1076 {"be": self.backend_name}) 

1077 

1078 def get_configured_ip_versions(self): 

1079 return self.protocol_helper.get_configured_ip_versions() 

1080 

1081 def transfer_accept(self, context, share, new_user, new_project, 

1082 access_rules=None, share_server=None): 

1083 # CephFS driver cannot transfer shares by preserving access rules 

1084 same_project = share["project_id"] == new_project 

1085 if access_rules and not same_project: 1085 ↛ exitline 1085 didn't return from function 'transfer_accept' because the condition on line 1085 was always true

1086 raise exception.DriverCannotTransferShareWithRules() 

1087 

1088 

1089class NativeProtocolHelper(ganesha.NASHelperBase): 

1090 """Helper class for native CephFS protocol""" 

1091 

1092 supported_access_types = (CEPHX_ACCESS_TYPE, ) 

1093 supported_access_levels = (constants.ACCESS_LEVEL_RW, 

1094 constants.ACCESS_LEVEL_RO) 

1095 reapply_rules_while_ensuring_shares = False 

1096 

1097 def __init__(self, execute, config, **kwargs): 

1098 self.rados_client = kwargs.pop('rados_client') 

1099 self.volname = kwargs.pop('volname') 

1100 self.message_api = message_api.API() 

1101 super(NativeProtocolHelper, self).__init__(execute, config, 

1102 **kwargs) 

1103 

1104 def _init_helper(self): 

1105 pass 

1106 

1107 def check_for_setup_error(self): 

1108 """Returns an error if prerequisites aren't met.""" 

1109 return 

1110 

1111 def get_mon_addrs(self): 

1112 result = [] 

1113 mon_map = rados_command(self.rados_client, "mon dump", json_obj=True, 

1114 target=('mon', )) 

1115 for mon in mon_map['mons']: 

1116 ip_port = mon['addr'].split("/")[0] 

1117 result.append(ip_port) 

1118 

1119 return result 

1120 

1121 def get_backend_info(self, context): 

1122 return { 

1123 "cephfs_ensure_all_shares_salt": 

1124 self.configuration.cephfs_ensure_all_shares_salt, 

1125 "cephfs_filesystem_name": self.volname, 

1126 } 

1127 

1128 def get_export_locations(self, share, subvolume_path): 

1129 # To mount this you need to know the mon IPs and the path to the volume 

1130 mon_addrs = self.get_mon_addrs() 

1131 

1132 export_location = "{addrs}:{path}".format( 

1133 addrs=",".join(mon_addrs), 

1134 path=subvolume_path) 

1135 

1136 LOG.info("Calculated export location for share %(id)s: %(loc)s", 

1137 {"id": share['id'], "loc": export_location}) 

1138 

1139 return { 

1140 'path': export_location, 

1141 'is_admin_only': False, 

1142 'metadata': {}, 

1143 } 

1144 

1145 def get_optional_share_creation_data(self, share, share_server=None): 

1146 return {"metadata": {"__mount_options": f"fs={self.volname}"}} 

1147 

1148 def _allow_access(self, context, share, access, share_server=None, 

1149 sub_name=None): 

1150 if access['access_type'] != CEPHX_ACCESS_TYPE: 

1151 raise exception.InvalidShareAccessType(type=access['access_type']) 

1152 

1153 ceph_auth_id = access['access_to'] 

1154 

1155 # We need to check here rather than the API or Manila Client to see 

1156 # if the ceph_auth_id is the same as the one specified for Manila's 

1157 # usage. This is due to the fact that the API and the Manila client 

1158 # cannot read the contents of the Manila configuration file. If it 

1159 # is the same, we need to error out. 

1160 if ceph_auth_id == CONF.cephfs_auth_id: 

1161 error_message = (_('Ceph authentication ID %s must be different ' 

1162 'than the one the Manila service uses.') % 

1163 ceph_auth_id) 

1164 raise exception.InvalidShareAccess(reason=error_message) 

1165 

1166 argdict = { 

1167 "vol_name": self.volname, 

1168 "sub_name": sub_name, 

1169 "auth_id": ceph_auth_id, 

1170 "tenant_id": share["project_id"], 

1171 } 

1172 if share["share_group_id"] is not None: 1172 ↛ 1173line 1172 didn't jump to line 1173 because the condition on line 1172 was never true

1173 argdict.update({"group_name": share["share_group_id"]}) 

1174 

1175 readonly = access['access_level'] == constants.ACCESS_LEVEL_RO 

1176 

1177 if readonly: 

1178 argdict.update({"access_level": "r"}) 

1179 else: 

1180 argdict.update({"access_level": "rw"}) 

1181 

1182 try: 

1183 auth_result = rados_command( 

1184 self.rados_client, "fs subvolume authorize", argdict) 

1185 except exception.ShareBackendException as e: 

1186 if 'not allowed' in str(e).lower(): 1186 ↛ 1192line 1186 didn't jump to line 1192 because the condition on line 1186 was always true

1187 msg = ("Access to client %(client)s is not allowed. " 

1188 "Reason: %(reason)s") 

1189 msg_payload = {'client': ceph_auth_id, 'reason': e} 

1190 raise exception.InvalidShareAccess( 

1191 reason=msg % msg_payload) 

1192 raise 

1193 

1194 return auth_result 

1195 

1196 def _deny_access(self, context, share, access, share_server=None, 

1197 sub_name=None): 

1198 if access['access_type'] != CEPHX_ACCESS_TYPE: 1198 ↛ 1199line 1198 didn't jump to line 1199 because the condition on line 1198 was never true

1199 LOG.warning("Invalid access type '%(type)s', " 

1200 "ignoring in deny.", 

1201 {"type": access['access_type']}) 

1202 return 

1203 

1204 argdict = { 

1205 "vol_name": self.volname, 

1206 "sub_name": sub_name, 

1207 "auth_id": access['access_to'] 

1208 } 

1209 if share["share_group_id"] is not None: 1209 ↛ 1210line 1209 didn't jump to line 1210 because the condition on line 1209 was never true

1210 argdict.update({"group_name": share["share_group_id"]}) 

1211 

1212 try: 

1213 rados_command(self.rados_client, 

1214 "fs subvolume deauthorize", 

1215 argdict) 

1216 except exception.ShareBackendException as e: 

1217 if "doesn't exist" in e.msg.lower(): 1217 ↛ 1221line 1217 didn't jump to line 1221 because the condition on line 1217 was always true

1218 LOG.warning(f"%{access['access_to']} did not have access to " 

1219 f"share {share['id']}.") 

1220 return 

1221 raise e 

1222 rados_command(self.rados_client, "fs subvolume evict", argdict) 

1223 

1224 def update_access(self, context, share, access_rules, add_rules, 

1225 delete_rules, update_rules, share_server=None, 

1226 sub_name=None): 

1227 access_updates = {} 

1228 

1229 argdict = { 

1230 "vol_name": self.volname, 

1231 "sub_name": sub_name, 

1232 } 

1233 if share["share_group_id"] is not None: 1233 ↛ 1234line 1233 didn't jump to line 1234 because the condition on line 1233 was never true

1234 argdict.update({"group_name": share["share_group_id"]}) 

1235 

1236 if not (add_rules or delete_rules): # recovery/maintenance mode 

1237 add_rules = access_rules 

1238 

1239 existing_auths = None 

1240 

1241 existing_auths = rados_command( 

1242 self.rados_client, "fs subvolume authorized_list", 

1243 argdict, json_obj=True) 

1244 

1245 if existing_auths: 1245 ↛ 1266line 1245 didn't jump to line 1266 because the condition on line 1245 was always true

1246 existing_auth_ids = set() 

1247 for rule in range(len(existing_auths)): 

1248 for cephx_id in existing_auths[rule]: 

1249 existing_auth_ids.add(cephx_id) 

1250 want_auth_ids = set( 

1251 [rule['access_to'] for rule in add_rules]) 

1252 delete_auth_ids = existing_auth_ids.difference( 

1253 want_auth_ids) 

1254 delete_auth_ids_list = delete_auth_ids 

1255 for delete_auth_id in delete_auth_ids_list: 

1256 delete_rules.append( 

1257 { 

1258 'access_to': delete_auth_id, 

1259 'access_type': CEPHX_ACCESS_TYPE, 

1260 }) 

1261 

1262 # During recovery mode, re-authorize share access for auth IDs that 

1263 # were already granted access by the backend. Do this to fetch their 

1264 # access keys and ensure that after recovery, manila and the Ceph 

1265 # backend are in sync. 

1266 for rule in add_rules: 

1267 try: 

1268 access_key = self._allow_access( 

1269 context, share, rule, sub_name=sub_name 

1270 ) 

1271 except (exception.InvalidShareAccessLevel, 

1272 exception.InvalidShareAccessType): 

1273 self.message_api.create( 

1274 context, 

1275 message_field.Action.UPDATE_ACCESS_RULES, 

1276 share['project_id'], 

1277 resource_type=message_field.Resource.SHARE, 

1278 resource_id=share['share_id'], 

1279 detail=message_field.Detail.UNSUPPORTED_CLIENT_ACCESS) 

1280 log_args = {'id': rule['access_id'], 

1281 'access_level': rule['access_level'], 

1282 'access_to': rule['access_to']} 

1283 LOG.exception("Failed to provide %(access_level)s access to " 

1284 "%(access_to)s (Rule ID: %(id)s). Setting rule " 

1285 "to 'error' state.", log_args) 

1286 access_updates.update({rule['access_id']: {'state': 'error'}}) 

1287 except exception.InvalidShareAccess: 

1288 self.message_api.create( 

1289 context, 

1290 message_field.Action.UPDATE_ACCESS_RULES, 

1291 share['project_id'], 

1292 resource_type=message_field.Resource.SHARE, 

1293 resource_id=share['share_id'], 

1294 detail=message_field.Detail.FORBIDDEN_CLIENT_ACCESS) 

1295 log_args = {'id': rule['access_id'], 

1296 'access_level': rule['access_level'], 

1297 'access_to': rule['access_to']} 

1298 LOG.exception("Failed to provide %(access_level)s access to " 

1299 "%(access_to)s (Rule ID: %(id)s). Setting rule " 

1300 "to 'error' state.", log_args) 

1301 access_updates.update({rule['access_id']: {'state': 'error'}}) 

1302 else: 

1303 access_updates.update({ 

1304 rule['access_id']: {'access_key': access_key}, 

1305 }) 

1306 

1307 for rule in delete_rules: 

1308 self._deny_access(context, share, rule, sub_name=sub_name) 

1309 

1310 return access_updates 

1311 

1312 def get_configured_ip_versions(self): 

1313 return [4] 

1314 

1315 

1316class NFSProtocolHelperMixin(): 

1317 

1318 def get_export_locations(self, share, subvolume_path): 

1319 export_locations = [] 

1320 

1321 if not self.export_ips: 1321 ↛ 1324line 1321 didn't jump to line 1324 because the condition on line 1321 was always true

1322 self.export_ips = self._get_export_ips() 

1323 

1324 for export_ip in self.export_ips: 

1325 # Try to escape the export ip. If it fails, means that the 

1326 # `cephfs_ganesha_server_ip` wasn't possibly set and the used 

1327 # address is the hostname 

1328 try: 

1329 server_address = driver_helpers.escaped_address( 

1330 export_ip['ip']) 

1331 except ValueError: 

1332 server_address = export_ip['ip'] 

1333 

1334 export_path = "{server_address}:{mount_path}".format( 

1335 server_address=server_address, mount_path=subvolume_path) 

1336 

1337 LOG.info("Calculated export path for share %(id)s: %(epath)s", 

1338 {"id": share['id'], "epath": export_path}) 

1339 

1340 export_location = { 

1341 'path': export_path, 

1342 'is_admin_only': False, 

1343 'metadata': { 

1344 'preferred': export_ip['preferred'], 

1345 }, 

1346 } 

1347 export_locations.append(export_location) 

1348 return export_locations 

1349 

1350 def get_optional_share_creation_data(self, share, share_server=None): 

1351 return {} 

1352 

1353 def _get_export_path(self, share, sub_name=None): 

1354 """Callback to provide export path.""" 

1355 argdict = { 

1356 "vol_name": self.volname, 

1357 "sub_name": sub_name or share["id"] 

1358 } 

1359 if share["share_group_id"] is not None: 1359 ↛ 1360line 1359 didn't jump to line 1360 because the condition on line 1359 was never true

1360 argdict.update({"group_name": share["share_group_id"]}) 

1361 

1362 path = rados_command( 

1363 self.rados_client, "fs subvolume getpath", argdict) 

1364 

1365 return path 

1366 

1367 def _get_export_pseudo_path(self, share, sub_name=None): 

1368 """Callback to provide pseudo path.""" 

1369 return self._get_export_path(share, sub_name=sub_name) 

1370 

1371 def get_configured_ip_versions(self): 

1372 if not self.configured_ip_versions: 

1373 try: 

1374 if not self.export_ips: 1374 ↛ 1377line 1374 didn't jump to line 1377 because the condition on line 1374 was always true

1375 self.export_ips = self._get_export_ips() 

1376 

1377 for export_ip in self.export_ips: 

1378 self.configured_ip_versions.add( 

1379 ipaddress.ip_address(str(export_ip['ip'])).version) 

1380 except Exception: 

1381 # export_ips contained a hostname, safest thing is to 

1382 # claim support for IPv4 and IPv6 address families 

1383 LOG.warning("Setting configured IP versions to [4, 6] since " 

1384 "a hostname (rather than IP address) was supplied " 

1385 "in 'cephfs_ganesha_server_ip' or " 

1386 "in 'cephfs_ganesha_export_ips'.") 

1387 self.configured_ip_versions = {4, 6} 

1388 return list(self.configured_ip_versions) 

1389 

1390 

1391class NFSProtocolHelper(NFSProtocolHelperMixin, ganesha.GaneshaNASHelper2): 

1392 

1393 shared_data = {} 

1394 supported_protocols = ('NFS',) 

1395 reapply_rules_while_ensuring_shares = True 

1396 

1397 def __init__(self, execute, config_object, **kwargs): 

1398 if config_object.cephfs_ganesha_server_is_remote: 

1399 execute = ganesha_utils.SSHExecutor( 

1400 config_object.cephfs_ganesha_server_ip, 22, None, 

1401 config_object.cephfs_ganesha_server_username, 

1402 password=config_object.cephfs_ganesha_server_password, 

1403 privatekey=config_object.cephfs_ganesha_path_to_private_key) 

1404 else: 

1405 execute = ganesha_utils.RootExecutor(execute) 

1406 

1407 self.ganesha_host = config_object.cephfs_ganesha_server_ip 

1408 if not self.ganesha_host: 

1409 self.ganesha_host = socket.gethostname() 

1410 LOG.info("NFS-Ganesha server's location defaulted to driver's " 

1411 "hostname: %s", self.ganesha_host) 

1412 

1413 super(NFSProtocolHelper, self).__init__(execute, config_object, 

1414 **kwargs) 

1415 

1416 LOG.warning('The NFSProtocolHelper has been deprecated. Starting ' 

1417 'from the 2025.1 release, we will no longer support ' 

1418 'exporting NFS shares through a NFS Ganesha instance ' 

1419 'that not managed by the Ceph orchestrator.') 

1420 if not hasattr(self, 'rados_client'): 1420 ↛ 1422line 1420 didn't jump to line 1422 because the condition on line 1420 was always true

1421 self.rados_client = kwargs.pop('rados_client') 

1422 if not hasattr(self, 'volname'): 1422 ↛ 1424line 1422 didn't jump to line 1424 because the condition on line 1422 was always true

1423 self.volname = kwargs.pop('volname') 

1424 self.export_ips = None 

1425 self.configured_ip_versions = set() 

1426 self.config = config_object 

1427 

1428 def check_for_setup_error(self): 

1429 """Returns an error if prerequisites aren't met.""" 

1430 host_address_obj = types.HostAddress() 

1431 for export_ip in self.config.cephfs_ganesha_export_ips: 

1432 try: 

1433 host_address_obj(export_ip) 

1434 except ValueError: 

1435 msg = (_("Invalid list member of 'cephfs_ganesha_export_ips' " 

1436 "option supplied %s -- not a valid IP address or " 

1437 "hostname.") % export_ip) 

1438 raise exception.InvalidParameterValue(err=msg) 

1439 

1440 def _default_config_hook(self): 

1441 """Callback to provide default export block.""" 

1442 dconf = super(NFSProtocolHelper, self)._default_config_hook() 

1443 conf_dir = ganesha_utils.path_from(__file__, "conf") 

1444 ganesha_utils.patch(dconf, self._load_conf_dir(conf_dir)) 

1445 return dconf 

1446 

1447 def _fsal_hook(self, base, share, access, sub_name=None): 

1448 """Callback to create FSAL subblock.""" 

1449 ceph_auth_id = ''.join(['ganesha-', share['id']]) 

1450 

1451 argdict = { 

1452 "vol_name": self.volname, 

1453 "sub_name": sub_name, 

1454 "auth_id": ceph_auth_id, 

1455 "access_level": "rw", 

1456 "tenant_id": share["project_id"], 

1457 } 

1458 if share["share_group_id"] is not None: 1458 ↛ 1459line 1458 didn't jump to line 1459 because the condition on line 1458 was never true

1459 argdict.update({"group_name": share["share_group_id"]}) 

1460 

1461 auth_result = rados_command( 

1462 self.rados_client, "fs subvolume authorize", argdict) 

1463 

1464 # Restrict Ganesha server's access to only the CephFS subtree or path, 

1465 # corresponding to the manila share, that is to be exported by making 

1466 # Ganesha use Ceph auth IDs with path restricted capabilities to 

1467 # communicate with CephFS. 

1468 return { 

1469 'Name': 'Ceph', 

1470 'User_Id': ceph_auth_id, 

1471 'Secret_Access_Key': auth_result, 

1472 'Filesystem': self.volname 

1473 } 

1474 

1475 def _cleanup_fsal_hook(self, base, share, access, sub_name=None): 

1476 """Callback for FSAL specific cleanup after removing an export.""" 

1477 ceph_auth_id = ''.join(['ganesha-', share['id']]) 

1478 

1479 argdict = { 

1480 "vol_name": self.volname, 

1481 "sub_name": sub_name, 

1482 "auth_id": ceph_auth_id, 

1483 } 

1484 if share["share_group_id"] is not None: 1484 ↛ 1485line 1484 didn't jump to line 1485 because the condition on line 1484 was never true

1485 argdict.update({"group_name": share["share_group_id"]}) 

1486 

1487 rados_command(self.rados_client, "fs subvolume deauthorize", argdict) 

1488 

1489 def _get_export_ips(self): 

1490 ganesha_export_ips = self.config.cephfs_ganesha_export_ips 

1491 if not ganesha_export_ips: 

1492 ganesha_export_ips = [self.ganesha_host] 

1493 

1494 export_ips = [] 

1495 for ip in set(ganesha_export_ips): 

1496 export_ips.append({'ip': ip, 'preferred': False}) 

1497 

1498 return export_ips 

1499 

1500 def get_backend_info(self, context): 

1501 backend_info = { 

1502 "cephfs_ganesha_export_ips": self.config.cephfs_ganesha_export_ips, 

1503 "cephfs_ganesha_server_ip": self.config.cephfs_ganesha_server_ip, 

1504 "cephfs_ensure_all_shares_salt": 

1505 self.configuration.cephfs_ensure_all_shares_salt, 

1506 } 

1507 return backend_info 

1508 

1509 

1510class NFSClusterProtocolHelper(NFSProtocolHelperMixin, ganesha.NASHelperBase): 

1511 

1512 supported_access_types = ('ip', ) 

1513 supported_access_levels = (constants.ACCESS_LEVEL_RW, 

1514 constants.ACCESS_LEVEL_RO) 

1515 reapply_rules_while_ensuring_shares = True 

1516 

1517 def __init__(self, execute, config_object, **kwargs): 

1518 self.rados_client = kwargs.pop('rados_client') 

1519 self.volname = kwargs.pop('volname') 

1520 self.configured_ip_versions = set() 

1521 self.configuration = config_object 

1522 self._nfs_clusterid = None 

1523 self.export_ips = None 

1524 super(NFSClusterProtocolHelper, self).__init__(execute, 

1525 config_object, 

1526 **kwargs) 

1527 

1528 @property 

1529 def nfs_clusterid(self): 

1530 # ID of the NFS cluster where the driver exports shares 

1531 if self._nfs_clusterid: 

1532 return self._nfs_clusterid 

1533 

1534 self._nfs_clusterid = ( 

1535 self.configuration.safe_get('cephfs_nfs_cluster_id')) 

1536 

1537 if not self._nfs_clusterid: 

1538 msg = _("The NFS Cluster ID has not been configured" 

1539 "Please check cephfs_nfs_cluster_id option " 

1540 "has been correctly set in the backend configuration.") 

1541 raise exception.ShareBackendException(msg=msg) 

1542 

1543 return self._nfs_clusterid 

1544 

1545 def _get_configured_export_ips(self): 

1546 ganesha_server_ips = ( 

1547 self.configuration.safe_get('cephfs_ganesha_export_ips') or []) 

1548 if not ganesha_server_ips: 

1549 ganesha_server_ips = ( 

1550 self.configuration.safe_get('cephfs_ganesha_server_ip')) 

1551 ganesha_server_ips = ( 

1552 [ganesha_server_ips] if ganesha_server_ips else []) 

1553 

1554 return set(ganesha_server_ips) 

1555 

1556 def _get_export_ips(self): 

1557 """Get NFS cluster export ips.""" 

1558 nfs_clusterid = self.nfs_clusterid 

1559 ceph_nfs_export_ips = [] 

1560 ganesha_export_ips = self._get_configured_export_ips() 

1561 argdict = { 

1562 "cluster_id": nfs_clusterid, 

1563 } 

1564 

1565 output = rados_command(self.rados_client, "nfs cluster info", argdict) 

1566 

1567 nfs_cluster_info = json.loads(output) 

1568 

1569 # NFS has been deployed with an ingress 

1570 # we use the VIP for the export ips 

1571 vip = nfs_cluster_info[nfs_clusterid]["virtual_ip"] 

1572 

1573 # there is no VIP, we fallback to NFS cluster ips 

1574 if not vip: 1574 ↛ 1579line 1574 didn't jump to line 1579 because the condition on line 1574 was always true

1575 hosts = nfs_cluster_info[nfs_clusterid]["backend"] 

1576 for host in hosts: 

1577 ceph_nfs_export_ips.append(host["ip"]) 

1578 else: 

1579 ceph_nfs_export_ips.append(vip) 

1580 

1581 # there are no export IPs, there are no NFS servers we can use 

1582 if not ceph_nfs_export_ips: 

1583 msg = _("There are no NFS servers available to use. " 

1584 "Please check the health of your Ceph cluster " 

1585 "and restart the manila share service.") 

1586 raise exception.ShareBackendException(msg=msg) 

1587 

1588 export_ips = [] 

1589 for ip in set(ceph_nfs_export_ips): 

1590 export_ips.append({'ip': ip, 'preferred': True}) 

1591 

1592 # It's possible for deployers to state additional 

1593 # NFS interfaces directly via manila.conf. If they do, 

1594 # these are represented as non-preferred export paths. 

1595 # This is mostly to allow NFS-Ganesha server migrations. 

1596 ganesha_export_ips = (eip for eip in ganesha_export_ips 

1597 if eip not in ceph_nfs_export_ips) 

1598 for ip in ganesha_export_ips: 

1599 export_ips.append({'ip': ip, 'preferred': False}) 

1600 

1601 return export_ips 

1602 

1603 def check_for_setup_error(self): 

1604 """Returns an error if prerequisites aren't met.""" 

1605 return 

1606 

1607 def _get_export_config(self, share, access, sub_name=None): 

1608 """Returns export configuration in JSON-encoded bytes.""" 

1609 pseudo_path = self._get_export_pseudo_path(share, sub_name=sub_name) 

1610 argdict = { 

1611 "cluster_id": self.nfs_clusterid, 

1612 "pseudo_path": pseudo_path 

1613 } 

1614 export = rados_command( 

1615 self.rados_client, "nfs export info", argdict, json_obj=True) 

1616 if export: 

1617 export["clients"] = access 

1618 else: 

1619 export = { 

1620 "path": self._get_export_path(share, sub_name=sub_name), 

1621 "cluster_id": self.nfs_clusterid, 

1622 "pseudo": pseudo_path, 

1623 "squash": "none", 

1624 "security_label": True, 

1625 "fsal": { 

1626 "name": "CEPH", 

1627 "fs_name": self.volname, 

1628 

1629 }, 

1630 "clients": access 

1631 } 

1632 return json.dumps(export).encode('utf-8') 

1633 

1634 def _allow_access(self, share, access, sub_name=None): 

1635 """Allow access to the share.""" 

1636 argdict = { 

1637 "cluster_id": self.nfs_clusterid, 

1638 } 

1639 inbuf = self._get_export_config(share, access, sub_name) 

1640 rados_command(self.rados_client, 

1641 "nfs export apply", argdict, inbuf=inbuf) 

1642 

1643 def _deny_access(self, share, sub_name=None): 

1644 """Deny access to the share.""" 

1645 

1646 argdict = { 

1647 "cluster_id": self.nfs_clusterid, 

1648 "pseudo_path": self._get_export_pseudo_path( 

1649 share, sub_name=sub_name) 

1650 } 

1651 

1652 rados_command(self.rados_client, "nfs export rm", argdict) 

1653 

1654 def update_access(self, context, share, access_rules, add_rules, 

1655 delete_rules, update_rules, share_server=None, 

1656 sub_name=None): 

1657 """Update access rules of share. 

1658 

1659 Creates an export per share. Modifies access rules of shares by 

1660 dynamically updating exports via ceph nfs. 

1661 """ 

1662 rule_state_map = {} 

1663 

1664 wanted_rw_clients, wanted_ro_clients = [], [] 

1665 for rule in access_rules: 

1666 try: 

1667 ganesha_utils.validate_access_rule( 

1668 self.supported_access_types, self.supported_access_levels, 

1669 rule, True) 

1670 except (exception.InvalidShareAccess, 

1671 exception.InvalidShareAccessLevel): 

1672 rule_state_map[rule['id']] = {'state': 'error'} 

1673 continue 

1674 

1675 rule = ganesha_utils.fixup_access_rule(rule) 

1676 if rule['access_level'] == 'rw': 

1677 wanted_rw_clients.append(rule['access_to']) 

1678 elif rule['access_level'] == 'ro': 

1679 wanted_ro_clients.append(rule['access_to']) 

1680 

1681 if access_rules: 

1682 # add or update export 

1683 clients = [] 

1684 if wanted_ro_clients: 

1685 clients.append({ 

1686 'access_type': 'ro', 

1687 'addresses': wanted_ro_clients, 

1688 'squash': 'none' 

1689 }) 

1690 if wanted_rw_clients: 

1691 clients.append({ 

1692 'access_type': 'rw', 

1693 'addresses': wanted_rw_clients, 

1694 'squash': 'none' 

1695 }) 

1696 

1697 if clients: # empty list if no rules passed validation 

1698 self._allow_access(share, clients, sub_name=sub_name) 

1699 else: 

1700 # no clients have access to the share. remove export 

1701 self._deny_access(share, sub_name=sub_name) 

1702 

1703 return rule_state_map 

1704 

1705 def get_backend_info(self, context): 

1706 backend_info = { 

1707 "cephfs_ganesha_export_ips": 

1708 self.configuration.cephfs_ganesha_export_ips, 

1709 "cephfs_ganesha_server_ip": 

1710 self.configuration.cephfs_ganesha_server_ip, 

1711 "cephfs_nfs_cluster_id": self.nfs_clusterid, 

1712 "cephfs_ensure_all_shares_salt": 

1713 self.configuration.cephfs_ensure_all_shares_salt, 

1714 } 

1715 return backend_info