Coverage for manila/share/drivers/zfsonlinux/driver.py: 99%

687 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2016 Mirantis Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16""" 

17Module with ZFSonLinux share driver that utilizes ZFS filesystem resources 

18and exports them as shares. 

19""" 

20 

21import math 

22import os 

23import time 

24 

25from oslo_config import cfg 

26from oslo_log import log 

27from oslo_utils import importutils 

28from oslo_utils import strutils 

29from oslo_utils import timeutils 

30 

31from manila.common import constants 

32from manila import exception 

33from manila.i18n import _ 

34from manila.share import configuration 

35from manila.share import driver 

36from manila.share.drivers.zfsonlinux import utils as zfs_utils 

37from manila.share.manager import share_manager_opts # noqa 

38from manila.share import share_types 

39from manila.share import utils as share_utils 

40from manila import utils 

41 

42 

43zfsonlinux_opts = [ 

44 cfg.HostAddressOpt( 

45 "zfs_share_export_ip", 

46 required=True, 

47 help="IP to be added to user-facing export location. Required."), 

48 cfg.HostAddressOpt( 

49 "zfs_service_ip", 

50 required=True, 

51 help="IP to be added to admin-facing export location. Required."), 

52 cfg.ListOpt( 

53 "zfs_zpool_list", 

54 required=True, 

55 help="Specify list of zpools that are allowed to be used by backend. " 

56 "Can contain nested datasets. Examples: " 

57 "Without nested dataset: 'zpool_name'. " 

58 "With nested dataset: 'zpool_name/nested_dataset_name'. " 

59 "Required."), 

60 cfg.ListOpt( 

61 "zfs_dataset_creation_options", 

62 help="Define here list of options that should be applied " 

63 "for each dataset creation if needed. Example: " 

64 "compression=gzip,dedup=off. " 

65 "Note that, for secondary replicas option 'readonly' will be set " 

66 "to 'on' and for active replicas to 'off' in any way. " 

67 "Also, 'quota' will be equal to share size. Optional."), 

68 cfg.StrOpt( 

69 "zfs_dataset_name_prefix", 

70 default='manila_share_', 

71 help="Prefix to be used in each dataset name. Optional."), 

72 cfg.StrOpt( 

73 "zfs_dataset_snapshot_name_prefix", 

74 default='manila_share_snapshot_', 

75 help="Prefix to be used in each dataset snapshot name. Optional."), 

76 cfg.BoolOpt( 

77 "zfs_use_ssh", 

78 default=False, 

79 help="Remote ZFS storage hostname that should be used for SSH'ing. " 

80 "Optional."), 

81 cfg.StrOpt( 

82 "zfs_ssh_username", 

83 help="SSH user that will be used in 2 cases: " 

84 "1) By manila-share service in case it is located on different " 

85 "host than its ZFS storage. " 

86 "2) By manila-share services with other ZFS backends that " 

87 "perform replication. " 

88 "It is expected that SSH'ing will be key-based, passwordless. " 

89 "This user should be passwordless sudoer. Optional."), 

90 cfg.StrOpt( 

91 "zfs_ssh_user_password", 

92 secret=True, 

93 help="Password for user that is used for SSH'ing ZFS storage host. " 

94 "Not used for replication operations. They require " 

95 "passwordless SSH access. Optional."), 

96 cfg.StrOpt( 

97 "zfs_ssh_private_key_path", 

98 help="Path to SSH private key that should be used for SSH'ing ZFS " 

99 "storage host. Not used for replication operations. Optional."), 

100 cfg.ListOpt( 

101 "zfs_share_helpers", 

102 default=[ 

103 "NFS=manila.share.drivers.zfsonlinux.utils.NFSviaZFSHelper", 

104 ], 

105 help="Specify list of share export helpers for ZFS storage. " 

106 "It should look like following: " 

107 "'FOO_protocol=foo.FooClass,BAR_protocol=bar.BarClass'. " 

108 "Required."), 

109 cfg.StrOpt( 

110 "zfs_replica_snapshot_prefix", 

111 default="tmp_snapshot_for_replication_", 

112 help="Set snapshot prefix for usage in ZFS replication. Required."), 

113 cfg.StrOpt( 

114 "zfs_migration_snapshot_prefix", 

115 default="tmp_snapshot_for_share_migration_", 

116 help="Set snapshot prefix for usage in ZFS migration. Required."), 

117] 

118 

119CONF = cfg.CONF 

120CONF.register_opts(zfsonlinux_opts) 

121LOG = log.getLogger(__name__) 

122 

123 

124def ensure_share_server_not_provided(f): 

125 

126 def wrap(self, context, *args, **kwargs): 

127 server = kwargs.get( 

128 "share_server", kwargs.get("destination_share_server")) 

129 if server: 

130 raise exception.InvalidInput( 

131 reason=_("Share server handling is not available. " 

132 "But 'share_server' was provided. '%s'. " 

133 "Share network should not be used.") % server.get( 

134 "id", server)) 

135 return f(self, context, *args, **kwargs) 

136 

137 return wrap 

138 

139 

140def get_backend_configuration(backend_name): 

141 config_stanzas = CONF.list_all_sections() 

142 if backend_name not in config_stanzas: 

143 msg = _("Could not find backend stanza %(backend_name)s in " 

144 "configuration which is required for share replication and " 

145 "migration. Available stanzas are %(stanzas)s") 

146 params = { 

147 "stanzas": config_stanzas, 

148 "backend_name": backend_name, 

149 } 

150 raise exception.BadConfigurationException(reason=msg % params) 

151 

152 config = configuration.Configuration( 

153 driver.share_opts, config_group=backend_name) 

154 config.append_config_values(zfsonlinux_opts) 

155 config.append_config_values(share_manager_opts) 

156 config.append_config_values(driver.ssh_opts) 

157 

158 return config 

159 

160 

161class ZFSonLinuxShareDriver(zfs_utils.ExecuteMixin, driver.ShareDriver): 

162 

163 def __init__(self, *args, **kwargs): 

164 super(ZFSonLinuxShareDriver, self).__init__( 

165 [False], *args, config_opts=[zfsonlinux_opts], **kwargs) 

166 self.replica_snapshot_prefix = ( 

167 self.configuration.zfs_replica_snapshot_prefix) 

168 self.migration_snapshot_prefix = ( 

169 self.configuration.zfs_migration_snapshot_prefix) 

170 self.backend_name = self.configuration.safe_get( 

171 'share_backend_name') or 'ZFSonLinux' 

172 self.zpool_list = self._get_zpool_list() 

173 self.dataset_creation_options = ( 

174 self.configuration.zfs_dataset_creation_options) 

175 self.share_export_ip = self.configuration.zfs_share_export_ip 

176 self.service_ip = self.configuration.zfs_service_ip 

177 self.private_storage = kwargs.get('private_storage') 

178 self._helpers = {} 

179 

180 # Set config based capabilities 

181 self._init_common_capabilities() 

182 

183 self._shell_executors = {} 

184 

185 def _get_shell_executor_by_host(self, host): 

186 backend_name = share_utils.extract_host(host, level='backend_name') 

187 if backend_name in CONF.enabled_share_backends: 

188 # Return executor of this host 

189 return self.execute 

190 elif backend_name not in self._shell_executors: 

191 config = get_backend_configuration(backend_name) 

192 self._shell_executors[backend_name] = ( 

193 zfs_utils.get_remote_shell_executor( 

194 ip=config.zfs_service_ip, 

195 port=22, 

196 conn_timeout=config.ssh_conn_timeout, 

197 login=config.zfs_ssh_username, 

198 password=config.zfs_ssh_user_password, 

199 privatekey=config.zfs_ssh_private_key_path, 

200 max_size=10, 

201 ) 

202 ) 

203 # Return executor of remote host 

204 return self._shell_executors[backend_name] 

205 

206 def _init_common_capabilities(self): 

207 self.common_capabilities = {} 

208 if 'dedup=on' in self.dataset_creation_options: 

209 self.common_capabilities['dedupe'] = [True] 

210 elif 'dedup=off' in self.dataset_creation_options: 

211 self.common_capabilities['dedupe'] = [False] 

212 else: 

213 self.common_capabilities['dedupe'] = [True, False] 

214 

215 if 'compression=off' in self.dataset_creation_options: 

216 self.common_capabilities['compression'] = [False] 

217 elif any('compression=' in option 

218 for option in self.dataset_creation_options): 

219 self.common_capabilities['compression'] = [True] 

220 else: 

221 self.common_capabilities['compression'] = [True, False] 

222 

223 # NOTE(vponomaryov): Driver uses 'quota' approach for 

224 # ZFS dataset. So, we can consider it as 

225 # 'always thin provisioned' because this driver never reserves 

226 # space for dataset. 

227 self.common_capabilities['thin_provisioning'] = [True] 

228 self.common_capabilities['max_over_subscription_ratio'] = ( 

229 self.configuration.max_over_subscription_ratio) 

230 self.common_capabilities['qos'] = [False] 

231 

232 def _get_zpool_list(self): 

233 zpools = [] 

234 for zpool in self.configuration.zfs_zpool_list: 

235 zpool_name = zpool.split('/')[0] 

236 if zpool_name in zpools: 

237 raise exception.BadConfigurationException( 

238 reason=_("Using the same zpool twice is prohibited. " 

239 "Duplicate is '%(zpool)s'. List of zpools: " 

240 "%(zpool_list)s.") % { 

241 'zpool': zpool, 

242 'zpool_list': ', '.join( 

243 self.configuration.zfs_zpool_list)}) 

244 zpools.append(zpool_name) 

245 return zpools 

246 

247 @zfs_utils.zfs_dataset_synchronized 

248 def _delete_dataset_or_snapshot_with_retry(self, name): 

249 """Attempts to destroy some dataset or snapshot with retries.""" 

250 # NOTE(vponomaryov): it is possible to see 'dataset is busy' error 

251 # under the load. So, we are ok to perform retry in this case. 

252 mountpoint = self.get_zfs_option(name, 'mountpoint') 

253 if '@' not in name: 

254 # NOTE(vponomaryov): check that dataset has no open files. 

255 start_point = time.time() 

256 while time.time() - start_point < 60: 

257 try: 

258 out, err = self.execute('lsof', '-w', mountpoint) 

259 except exception.ProcessExecutionError: 

260 # NOTE(vponomaryov): lsof returns code 1 if search 

261 # didn't give results. 

262 break 

263 LOG.debug("Cannot destroy dataset '%(name)s', it has " 

264 "opened files. Will wait 2 more seconds. " 

265 "Out: \n%(out)s", { 

266 'name': name, 'out': out}) 

267 time.sleep(2) 

268 else: 

269 raise exception.ZFSonLinuxException( 

270 msg=_("Could not destroy '%s' dataset, " 

271 "because it had opened files.") % name) 

272 

273 @utils.retry(retry_param=exception.ProcessExecutionError, retries=10) 

274 def _zfs_destroy_with_retry(): 

275 """Retry destroying dataset ten times with exponential backoff.""" 

276 # NOTE(bswartz): There appears to be a bug in ZFS when creating and 

277 # destroying datasets concurrently where the filesystem remains 

278 # mounted even though ZFS thinks it's unmounted. The most reliable 

279 # workaround I've found is to force the unmount, then attempt the 

280 # destroy, with short pauses around the unmount. (See bug#1546723) 

281 try: 

282 self.execute('sudo', 'umount', mountpoint) 

283 except exception.ProcessExecutionError: 

284 # Ignore failed umount, it's normal 

285 pass 

286 time.sleep(2) 

287 

288 # NOTE(vponomaryov): Now, when no file usages and mounts of dataset 

289 # exist, destroy dataset. 

290 self.zfs('destroy', '-f', name) 

291 

292 _zfs_destroy_with_retry() 

293 

294 def _setup_helpers(self): 

295 """Setups share helper for ZFS backend.""" 

296 self._helpers = {} 

297 helpers = self.configuration.zfs_share_helpers 

298 if helpers: 

299 for helper_str in helpers: 

300 share_proto, __, import_str = helper_str.partition('=') 

301 helper = importutils.import_class(import_str) 

302 self._helpers[share_proto.upper()] = helper( 

303 self.configuration) 

304 else: 

305 raise exception.BadConfigurationException( 

306 reason=_( 

307 "No share helpers selected for ZFSonLinux Driver. " 

308 "Please specify using config option 'zfs_share_helpers'.")) 

309 

310 def _get_share_helper(self, share_proto): 

311 """Returns share helper specific for used share protocol.""" 

312 helper = self._helpers.get(share_proto) 

313 if helper: 

314 return helper 

315 else: 

316 raise exception.InvalidShare( 

317 reason=_("Wrong, unsupported or disabled protocol - " 

318 "'%s'.") % share_proto) 

319 

320 def do_setup(self, context): 

321 """Perform basic setup and checks.""" 

322 super(ZFSonLinuxShareDriver, self).do_setup(context) 

323 self._setup_helpers() 

324 for ip in (self.share_export_ip, self.service_ip): 

325 if not utils.is_valid_ip_address(ip, 4): 

326 raise exception.BadConfigurationException( 

327 reason=_("Wrong IP address provided: " 

328 "%s") % self.share_export_ip) 

329 

330 if not self.zpool_list: 

331 raise exception.BadConfigurationException( 

332 reason=_("No zpools specified for usage: " 

333 "%s") % self.zpool_list) 

334 

335 # Make pool mounts shared so that cloned namespaces receive unmounts 

336 # and don't prevent us from unmounting datasets 

337 for zpool in self.configuration.zfs_zpool_list: 

338 self.execute('sudo', 'mount', '--make-rshared', ('/%s' % zpool)) 

339 

340 if self.configuration.zfs_use_ssh: 

341 # Check workability of SSH executor 

342 self.ssh_executor('whoami') 

343 

344 def _get_pools_info(self): 

345 """Returns info about all pools used by backend.""" 

346 pools = [] 

347 for zpool in self.zpool_list: 

348 free_size = self.get_zpool_option(zpool, 'free') 

349 free_size = utils.translate_string_size_to_float(free_size) 

350 total_size = self.get_zpool_option(zpool, 'size') 

351 total_size = utils.translate_string_size_to_float(total_size) 

352 pool = { 

353 'pool_name': zpool, 

354 'total_capacity_gb': float(total_size), 

355 'free_capacity_gb': float(free_size), 

356 'reserved_percentage': 

357 self.configuration.reserved_share_percentage, 

358 'reserved_snapshot_percentage': ( 

359 self.configuration.reserved_share_from_snapshot_percentage 

360 or self.configuration.reserved_share_percentage), 

361 'reserved_share_extend_percentage': ( 

362 self.configuration.reserved_share_extend_percentage 

363 or self.configuration.reserved_share_percentage), 

364 } 

365 pool.update(self.common_capabilities) 

366 if self.configuration.replication_domain: 

367 pool['replication_type'] = 'readable' 

368 pools.append(pool) 

369 return pools 

370 

371 def _update_share_stats(self): 

372 """Retrieves share stats info.""" 

373 data = { 

374 'share_backend_name': self.backend_name, 

375 'storage_protocol': 'NFS', 

376 'reserved_percentage': 

377 self.configuration.reserved_share_percentage, 

378 'reserved_snapshot_percentage': ( 

379 self.configuration.reserved_share_from_snapshot_percentage 

380 or self.configuration.reserved_share_percentage), 

381 'reserved_share_extend_percentage': ( 

382 self.configuration.reserved_share_extend_percentage 

383 or self.configuration.reserved_share_percentage), 

384 'snapshot_support': True, 

385 'create_share_from_snapshot_support': True, 

386 'driver_name': 'ZFS', 

387 'pools': self._get_pools_info(), 

388 } 

389 if self.configuration.replication_domain: 

390 data['replication_type'] = 'readable' 

391 super(ZFSonLinuxShareDriver, self)._update_share_stats(data) 

392 

393 def _get_share_name(self, share_id): 

394 """Returns name of dataset used for given share.""" 

395 prefix = self.configuration.zfs_dataset_name_prefix or '' 

396 return prefix + share_id.replace('-', '_') 

397 

398 def _get_snapshot_name(self, snapshot_id): 

399 """Returns name of dataset snapshot used for given share snapshot.""" 

400 prefix = self.configuration.zfs_dataset_snapshot_name_prefix or '' 

401 return prefix + snapshot_id.replace('-', '_') 

402 

403 def _get_dataset_creation_options(self, share, is_readonly=False): 

404 """Returns list of options to be used for dataset creation.""" 

405 options = ['quota=%sG' % share['size']] 

406 extra_specs = share_types.get_extra_specs_from_share(share) 

407 

408 dedupe_set = False 

409 dedupe = extra_specs.get('dedupe') 

410 if dedupe: 

411 dedupe = strutils.bool_from_string( 

412 dedupe.lower().split(' ')[-1], default=dedupe) 

413 if (dedupe in self.common_capabilities['dedupe']): 

414 options.append('dedup=%s' % ('on' if dedupe else 'off')) 

415 dedupe_set = True 

416 else: 

417 raise exception.ZFSonLinuxException(msg=_( 

418 "Cannot use requested '%(requested)s' value of 'dedupe' " 

419 "extra spec. It does not fit allowed value '%(allowed)s' " 

420 "that is configured for backend.") % { 

421 'requested': dedupe, 

422 'allowed': self.common_capabilities['dedupe']}) 

423 

424 compression_set = False 

425 compression_type = extra_specs.get('zfsonlinux:compression') 

426 if compression_type: 

427 if (compression_type == 'off' and 

428 False in self.common_capabilities['compression']): 

429 options.append('compression=off') 

430 compression_set = True 

431 elif (compression_type != 'off' and 

432 True in self.common_capabilities['compression']): 

433 options.append('compression=%s' % compression_type) 

434 compression_set = True 

435 else: 

436 raise exception.ZFSonLinuxException(msg=_( 

437 "Cannot use value '%s' of extra spec " 

438 "'zfsonlinux:compression' because compression is disabled " 

439 "for this backend. Set extra spec 'compression=True' to " 

440 "make scheduler pick up appropriate backend." 

441 ) % compression_type) 

442 

443 for option in self.dataset_creation_options or []: 

444 if any(v in option for v in ( 

445 'readonly', 'sharenfs', 'sharesmb', 'quota')): 

446 continue 

447 if 'dedup' in option and dedupe_set is True: 

448 continue 

449 if 'compression' in option and compression_set is True: 

450 continue 

451 options.append(option) 

452 if is_readonly: 

453 options.append('readonly=on') 

454 else: 

455 options.append('readonly=off') 

456 return options 

457 

458 def _get_dataset_name(self, share): 

459 """Returns name of dataset used for given share.""" 

460 pool_name = share_utils.extract_host(share['host'], level='pool') 

461 

462 # Pick pool with nested dataset name if set up 

463 for pool in self.configuration.zfs_zpool_list: 

464 pool_data = pool.split('/') 

465 if (pool_name == pool_data[0] and len(pool_data) > 1): 

466 pool_name = pool 

467 if pool_name[-1] == '/': 

468 pool_name = pool_name[0:-1] 

469 break 

470 

471 dataset_name = self._get_share_name(share['id']) 

472 full_dataset_name = '%(pool)s/%(dataset)s' % { 

473 'pool': pool_name, 'dataset': dataset_name} 

474 

475 return full_dataset_name 

476 

477 @ensure_share_server_not_provided 

478 def create_share(self, context, share, share_server=None): 

479 """Is called to create a share.""" 

480 options = self._get_dataset_creation_options(share, is_readonly=False) 

481 cmd = ['create'] 

482 for option in options: 

483 cmd.extend(['-o', option]) 

484 dataset_name = self._get_dataset_name(share) 

485 cmd.append(dataset_name) 

486 

487 ssh_cmd = '%(username)s@%(host)s' % { 

488 'username': self.configuration.zfs_ssh_username, 

489 'host': self.service_ip, 

490 } 

491 pool_name = share_utils.extract_host(share['host'], level='pool') 

492 self.private_storage.update( 

493 share['id'], { 

494 'entity_type': 'share', 

495 'dataset_name': dataset_name, 

496 'ssh_cmd': ssh_cmd, # used with replication and migration 

497 'pool_name': pool_name, # used in replication 

498 'used_options': ' '.join(options), 

499 } 

500 ) 

501 

502 self.zfs(*cmd) 

503 

504 return self._get_share_helper( 

505 share['share_proto']).create_exports(dataset_name) 

506 

507 @ensure_share_server_not_provided 

508 def delete_share(self, context, share, share_server=None): 

509 """Is called to remove a share.""" 

510 pool_name = self.private_storage.get(share['id'], 'pool_name') 

511 pool_name = pool_name or share_utils.extract_host( 

512 share["host"], level="pool") 

513 dataset_name = self.private_storage.get(share['id'], 'dataset_name') 

514 if not dataset_name: 

515 dataset_name = self._get_dataset_name(share) 

516 

517 out, err = self.zfs('list', '-r', pool_name) 

518 data = self.parse_zfs_answer(out) 

519 for datum in data: 

520 if datum['NAME'] != dataset_name: 

521 continue 

522 

523 # Delete dataset's snapshots first 

524 out, err = self.zfs('list', '-r', '-t', 'snapshot', pool_name) 

525 snapshots = self.parse_zfs_answer(out) 

526 full_snapshot_prefix = ( 

527 dataset_name + '@') 

528 for snap in snapshots: 

529 if full_snapshot_prefix in snap['NAME']: 

530 self._delete_dataset_or_snapshot_with_retry(snap['NAME']) 

531 

532 self._get_share_helper( 

533 share['share_proto']).remove_exports(dataset_name) 

534 self._delete_dataset_or_snapshot_with_retry(dataset_name) 

535 break 

536 else: 

537 LOG.warning( 

538 "Share with '%(id)s' ID and '%(name)s' NAME is " 

539 "absent on backend. Nothing has been deleted.", 

540 {'id': share['id'], 'name': dataset_name}) 

541 self.private_storage.delete(share['id']) 

542 

543 @ensure_share_server_not_provided 

544 def create_snapshot(self, context, snapshot, share_server=None): 

545 """Is called to create a snapshot.""" 

546 dataset_name = self.private_storage.get( 

547 snapshot['share_instance_id'], 'dataset_name') 

548 snapshot_tag = self._get_snapshot_name(snapshot['id']) 

549 snapshot_name = dataset_name + '@' + snapshot_tag 

550 self.private_storage.update( 

551 snapshot['snapshot_id'], { 

552 'entity_type': 'snapshot', 

553 'snapshot_tag': snapshot_tag, 

554 } 

555 ) 

556 self.zfs('snapshot', snapshot_name) 

557 return {"provider_location": snapshot_name} 

558 

559 @ensure_share_server_not_provided 

560 def delete_snapshot(self, context, snapshot, share_server=None): 

561 """Is called to remove a snapshot.""" 

562 self._delete_snapshot(context, snapshot) 

563 self.private_storage.delete(snapshot['snapshot_id']) 

564 

565 def _get_saved_snapshot_name(self, snapshot_instance): 

566 snapshot_tag = self.private_storage.get( 

567 snapshot_instance['snapshot_id'], 'snapshot_tag') 

568 dataset_name = self.private_storage.get( 

569 snapshot_instance['share_instance_id'], 'dataset_name') 

570 snapshot_name = dataset_name + '@' + snapshot_tag 

571 return snapshot_name 

572 

573 def _delete_snapshot(self, context, snapshot): 

574 snapshot_name = self._get_saved_snapshot_name(snapshot) 

575 out, err = self.zfs('list', '-r', '-t', 'snapshot', snapshot_name) 

576 data = self.parse_zfs_answer(out) 

577 for datum in data: 

578 if datum['NAME'] == snapshot_name: 

579 self._delete_dataset_or_snapshot_with_retry(snapshot_name) 

580 break 

581 else: 

582 LOG.warning( 

583 "Snapshot with '%(id)s' ID and '%(name)s' NAME is " 

584 "absent on backend. Nothing has been deleted.", 

585 {'id': snapshot['id'], 'name': snapshot_name}) 

586 

587 @ensure_share_server_not_provided 

588 def create_share_from_snapshot(self, context, share, snapshot, 

589 share_server=None, parent_share=None): 

590 """Is called to create a share from snapshot.""" 

591 src_backend_name = share_utils.extract_host( 

592 snapshot.share_instance['host'], level='backend_name' 

593 ) 

594 src_snapshot_name = self._get_saved_snapshot_name(snapshot) 

595 dataset_name = self._get_dataset_name(share) 

596 

597 dst_backend_ssh_cmd = '%(username)s@%(host)s' % { 

598 'username': self.configuration.zfs_ssh_username, 

599 'host': self.service_ip, 

600 } 

601 

602 dst_backend_pool_name = share_utils.extract_host(share['host'], 

603 level='pool') 

604 options = self._get_dataset_creation_options(share, is_readonly=False) 

605 

606 self.private_storage.update( 

607 share['id'], { 

608 'entity_type': 'share', 

609 'dataset_name': dataset_name, 

610 'ssh_cmd': dst_backend_ssh_cmd, # used in replication 

611 'pool_name': dst_backend_pool_name, # used in replication 

612 'used_options': options, 

613 } 

614 ) 

615 

616 # NOTE(andrebeltrami): Implementing the support for create share 

617 # from snapshot in different backends in different hosts 

618 src_config = get_backend_configuration(src_backend_name) 

619 src_backend_ssh_cmd = '%(username)s@%(host)s' % { 

620 'username': src_config.zfs_ssh_username, 

621 'host': src_config.zfs_service_ip, 

622 } 

623 self.execute( 

624 # NOTE(vponomaryov): SSH is used as workaround for 'execute' 

625 # implementation restriction that does not support usage 

626 # of '|'. 

627 'ssh', src_backend_ssh_cmd, 

628 'sudo', 'zfs', 'send', '-vD', src_snapshot_name, '|', 

629 'ssh', dst_backend_ssh_cmd, 

630 'sudo', 'zfs', 'receive', '-v', dataset_name, 

631 ) 

632 

633 # Apply options based on used share type that may differ from 

634 # one used for original share. 

635 for option in options: 

636 self.zfs('set', option, dataset_name) 

637 

638 # Delete with retry as right after creation it may be temporary busy. 

639 self.execute_with_retry( 

640 'sudo', 'zfs', 'destroy', 

641 dataset_name + '@' + src_snapshot_name.split('@')[-1]) 

642 

643 return self._get_share_helper( 

644 share['share_proto']).create_exports(dataset_name) 

645 

646 def get_pool(self, share): 

647 """Return pool name where the share resides on. 

648 

649 :param share: The share hosted by the driver. 

650 """ 

651 pool_name = share_utils.extract_host(share['host'], level='pool') 

652 return pool_name 

653 

654 @ensure_share_server_not_provided 

655 def ensure_share(self, context, share, share_server=None): 

656 """Invoked to ensure that given share is exported.""" 

657 dataset_name = self.private_storage.get(share['id'], 'dataset_name') 

658 if not dataset_name: 

659 dataset_name = self._get_dataset_name(share) 

660 

661 pool_name = share_utils.extract_host(share['host'], level='pool') 

662 out, err = self.zfs('list', '-r', pool_name) 

663 data = self.parse_zfs_answer(out) 

664 for datum in data: 

665 if datum['NAME'] == dataset_name: 

666 ssh_cmd = '%(username)s@%(host)s' % { 

667 'username': self.configuration.zfs_ssh_username, 

668 'host': self.service_ip, 

669 } 

670 self.private_storage.update( 

671 share['id'], {'ssh_cmd': ssh_cmd}) 

672 sharenfs = self.get_zfs_option(dataset_name, 'sharenfs') 

673 if sharenfs != 'off': 

674 self.zfs('share', dataset_name) 

675 export_locations = self._get_share_helper( 

676 share['share_proto']).get_exports(dataset_name) 

677 return export_locations 

678 else: 

679 raise exception.ShareResourceNotFound(share_id=share['id']) 

680 

681 def get_network_allocations_number(self): 

682 """ZFS does not handle networking. Return 0.""" 

683 return 0 

684 

685 @ensure_share_server_not_provided 

686 def extend_share(self, share, new_size, share_server=None): 

687 """Extends size of existing share.""" 

688 dataset_name = self._get_dataset_name(share) 

689 self.zfs('set', 'quota=%sG' % new_size, dataset_name) 

690 

691 @ensure_share_server_not_provided 

692 def shrink_share(self, share, new_size, share_server=None): 

693 """Shrinks size of existing share.""" 

694 dataset_name = self._get_dataset_name(share) 

695 consumed_space = self.get_zfs_option(dataset_name, 'used') 

696 consumed_space = utils.translate_string_size_to_float(consumed_space) 

697 if consumed_space >= new_size: 

698 raise exception.ShareShrinkingPossibleDataLoss( 

699 share_id=share['id']) 

700 self.zfs('set', 'quota=%sG' % new_size, dataset_name) 

701 

702 @ensure_share_server_not_provided 

703 def update_access(self, context, share, access_rules, add_rules, 

704 delete_rules, update_rules, share_server=None): 

705 """Updates access rules for given share.""" 

706 dataset_name = self._get_dataset_name(share) 

707 executor = self._get_shell_executor_by_host(share['host']) 

708 return self._get_share_helper(share['share_proto']).update_access( 

709 dataset_name, access_rules, add_rules, delete_rules, 

710 executor=executor) 

711 

712 def manage_existing(self, share, driver_options): 

713 """Manage existing ZFS dataset as manila share. 

714 

715 ZFSonLinux driver accepts only one driver_option 'size'. 

716 If an administrator provides this option, then such quota will be set 

717 to dataset and used as share size. Otherwise, driver will set quota 

718 equal to nearest bigger rounded integer of usage size. 

719 Driver does not expect mountpoint to be changed (should be equal 

720 to default that is "/%(dataset_name)s"). 

721 

722 :param share: share data 

723 :param driver_options: Empty dict or dict with 'size' option. 

724 :return: dict with share size and its export locations. 

725 """ 

726 old_export_location = share["export_locations"][0]["path"] 

727 old_dataset_name = old_export_location.split(":/")[-1] 

728 

729 scheduled_pool_name = share_utils.extract_host( 

730 share["host"], level="pool") 

731 actual_pool_name = old_dataset_name.split("/")[0] 

732 

733 new_dataset_name = self._get_dataset_name(share) 

734 

735 # Calculate quota for managed dataset 

736 quota = driver_options.get("size") 

737 if not quota: 

738 consumed_space = self.get_zfs_option(old_dataset_name, "used") 

739 consumed_space = utils.translate_string_size_to_float( 

740 consumed_space) 

741 quota = int(consumed_space) + 1 

742 share["size"] = int(quota) 

743 

744 # Save dataset-specific data in private storage 

745 options = self._get_dataset_creation_options(share, is_readonly=False) 

746 ssh_cmd = "%(username)s@%(host)s" % { 

747 "username": self.configuration.zfs_ssh_username, 

748 "host": self.service_ip, 

749 } 

750 

751 # Perform checks on requested dataset 

752 if actual_pool_name != scheduled_pool_name: 

753 raise exception.ZFSonLinuxException( 

754 _("Cannot manage share '%(share_id)s' " 

755 "(share_instance '%(si_id)s'), because scheduled " 

756 "pool '%(sch)s' and actual '%(actual)s' differ.") % { 

757 "share_id": share["share_id"], 

758 "si_id": share["id"], 

759 "sch": scheduled_pool_name, 

760 "actual": actual_pool_name}) 

761 

762 out, err = self.zfs("list", "-r", actual_pool_name) 

763 data = self.parse_zfs_answer(out) 

764 for datum in data: 

765 if datum["NAME"] == old_dataset_name: 

766 break 

767 else: 

768 raise exception.ZFSonLinuxException( 

769 _("Cannot manage share '%(share_id)s' " 

770 "(share_instance '%(si_id)s'), because dataset " 

771 "'%(dataset)s' not found in zpool '%(zpool)s'.") % { 

772 "share_id": share["share_id"], 

773 "si_id": share["id"], 

774 "dataset": old_dataset_name, 

775 "zpool": actual_pool_name}) 

776 

777 # Unmount the dataset before attempting to rename and mount 

778 try: 

779 self._unmount_share_with_retry(old_dataset_name) 

780 except exception.ZFSonLinuxException: 

781 msg = _("Unable to unmount share before renaming and re-mounting.") 

782 raise exception.ZFSonLinuxException(message=msg) 

783 

784 # Rename the dataset and mount with new name 

785 self.zfs_with_retry("rename", old_dataset_name, new_dataset_name) 

786 

787 try: 

788 self.zfs("mount", new_dataset_name) 

789 except exception.ProcessExecutionError: 

790 # Workaround for bug/1785180 

791 out, err = self.zfs("mount") 

792 mounted = any([new_dataset_name in mountedfs 

793 for mountedfs in out.splitlines()]) 

794 if not mounted: 

795 raise 

796 

797 # Apply options to dataset 

798 for option in options: 

799 self.zfs("set", option, new_dataset_name) 

800 

801 # Get new export locations of renamed dataset 

802 export_locations = self._get_share_helper( 

803 share["share_proto"]).get_exports(new_dataset_name) 

804 

805 self.private_storage.update( 

806 share["id"], { 

807 "entity_type": "share", 

808 "dataset_name": new_dataset_name, 

809 "ssh_cmd": ssh_cmd, # used in replication 

810 "pool_name": actual_pool_name, # used in replication 

811 "used_options": " ".join(options), 

812 } 

813 ) 

814 

815 return {"size": share["size"], "export_locations": export_locations} 

816 

817 def unmanage(self, share): 

818 """Removes the specified share from Manila management.""" 

819 self.private_storage.delete(share['id']) 

820 

821 def manage_existing_snapshot(self, snapshot_instance, driver_options): 

822 """Manage existing share snapshot with manila. 

823 

824 :param snapshot_instance: SnapshotInstance data 

825 :param driver_options: expects only one optional key 'size'. 

826 :return: dict with share snapshot instance fields for update, example:: 

827 

828 { 

829 

830 'size': 1, 

831 'provider_location': 'path/to/some/dataset@some_snapshot_tag', 

832 

833 } 

834 

835 """ 

836 snapshot_size = int(driver_options.get("size", 0)) 

837 old_provider_location = snapshot_instance.get("provider_location") 

838 old_snapshot_tag = old_provider_location.split("@")[-1] 

839 new_snapshot_tag = self._get_snapshot_name(snapshot_instance["id"]) 

840 

841 self.private_storage.update( 

842 snapshot_instance["snapshot_id"], { 

843 "entity_type": "snapshot", 

844 "old_snapshot_tag": old_snapshot_tag, 

845 "snapshot_tag": new_snapshot_tag, 

846 } 

847 ) 

848 

849 try: 

850 self.zfs("list", "-r", "-t", "snapshot", old_provider_location) 

851 except exception.ProcessExecutionError as e: 

852 raise exception.ManageInvalidShareSnapshot(reason=e.stderr) 

853 

854 if not snapshot_size: 

855 consumed_space = self.get_zfs_option(old_provider_location, "used") 

856 consumed_space = utils.translate_string_size_to_float( 

857 consumed_space) 

858 snapshot_size = int(math.ceil(consumed_space)) 

859 

860 dataset_name = self.private_storage.get( 

861 snapshot_instance["share_instance_id"], "dataset_name") 

862 new_provider_location = dataset_name + "@" + new_snapshot_tag 

863 

864 self.zfs("rename", old_provider_location, new_provider_location) 

865 

866 return { 

867 "size": snapshot_size, 

868 "provider_location": new_provider_location, 

869 } 

870 

871 def unmanage_snapshot(self, snapshot_instance): 

872 """Unmanage dataset snapshot.""" 

873 self.private_storage.delete(snapshot_instance["snapshot_id"]) 

874 

875 @utils.retry(retry_param=exception.ZFSonLinuxException, retries=10) 

876 def _unmount_share_with_retry(self, share_name): 

877 out, err = self.execute("sudo", "mount") 

878 if "%s " % share_name not in out: 

879 return 

880 self.zfs_with_retry("umount", "-f", share_name) 

881 out, err = self.execute("sudo", "mount") 

882 if "%s " % share_name in out: 

883 raise exception.ZFSonLinuxException( 

884 _("Unable to unmount dataset %s"), share_name) 

885 

886 def _get_replication_snapshot_prefix(self, replica): 

887 """Returns replica-based snapshot prefix.""" 

888 replication_snapshot_prefix = "%s_%s" % ( 

889 self.replica_snapshot_prefix, replica['id'].replace('-', '_')) 

890 return replication_snapshot_prefix 

891 

892 def _get_replication_snapshot_tag(self, replica): 

893 """Returns replica- and time-based snapshot tag.""" 

894 current_time = timeutils.utcnow().isoformat() 

895 snapshot_tag = "%s_time_%s" % ( 

896 self._get_replication_snapshot_prefix(replica), current_time) 

897 return snapshot_tag 

898 

899 def _get_active_replica(self, replica_list): 

900 for replica in replica_list: 

901 if replica['replica_state'] == constants.REPLICA_STATE_ACTIVE: 

902 return replica 

903 msg = _("Active replica not found.") 

904 raise exception.ReplicationException(reason=msg) 

905 

906 def _get_migration_snapshot_prefix(self, share_instance): 

907 """Returns migration-based snapshot prefix.""" 

908 migration_snapshot_prefix = "%s_%s" % ( 

909 self.migration_snapshot_prefix, 

910 share_instance['id'].replace('-', '_')) 

911 return migration_snapshot_prefix 

912 

913 def _get_migration_snapshot_tag(self, share_instance): 

914 """Returns migration- and time-based snapshot tag.""" 

915 current_time = timeutils.utcnow().isoformat() 

916 snapshot_tag = "%s_time_%s" % ( 

917 self._get_migration_snapshot_prefix(share_instance), current_time) 

918 snapshot_tag = ( 

919 snapshot_tag.replace('-', '_').replace('.', '_').replace(':', '_')) 

920 return snapshot_tag 

921 

922 @ensure_share_server_not_provided 

923 def create_replica(self, context, replica_list, new_replica, 

924 access_rules, replica_snapshots, share_server=None): 

925 """Replicates the active replica to a new replica on this backend.""" 

926 active_replica = self._get_active_replica(replica_list) 

927 src_dataset_name = self.private_storage.get( 

928 active_replica['id'], 'dataset_name') 

929 ssh_to_src_cmd = self.private_storage.get( 

930 active_replica['id'], 'ssh_cmd') 

931 dst_dataset_name = self._get_dataset_name(new_replica) 

932 

933 ssh_cmd = '%(username)s@%(host)s' % { 

934 'username': self.configuration.zfs_ssh_username, 

935 'host': self.service_ip, 

936 } 

937 

938 snapshot_tag = self._get_replication_snapshot_tag(new_replica) 

939 src_snapshot_name = ( 

940 '%(dataset_name)s@%(snapshot_tag)s' % { 

941 'snapshot_tag': snapshot_tag, 

942 'dataset_name': src_dataset_name, 

943 } 

944 ) 

945 # Save valuable data to DB 

946 self.private_storage.update(active_replica['id'], { 

947 'repl_snapshot_tag': snapshot_tag, 

948 }) 

949 self.private_storage.update(new_replica['id'], { 

950 'entity_type': 'replica', 

951 'replica_type': 'readable', 

952 'dataset_name': dst_dataset_name, 

953 'ssh_cmd': ssh_cmd, 

954 'pool_name': share_utils.extract_host( 

955 new_replica['host'], level='pool'), 

956 'repl_snapshot_tag': snapshot_tag, 

957 }) 

958 

959 # Create temporary snapshot. It will exist until following replica sync 

960 # After it - new one will appear and so in loop. 

961 self.execute( 

962 'ssh', ssh_to_src_cmd, 

963 'sudo', 'zfs', 'snapshot', src_snapshot_name, 

964 ) 

965 

966 # Send/receive temporary snapshot 

967 out, err = self.execute( 

968 'ssh', ssh_to_src_cmd, 

969 'sudo', 'zfs', 'send', '-vDR', src_snapshot_name, '|', 

970 'ssh', ssh_cmd, 

971 'sudo', 'zfs', 'receive', '-v', dst_dataset_name, 

972 ) 

973 msg = ("Info about replica '%(replica_id)s' creation is following: " 

974 "\n%(out)s") 

975 LOG.debug(msg, {'replica_id': new_replica['id'], 'out': out}) 

976 

977 # Make replica readonly 

978 self.zfs('set', 'readonly=on', dst_dataset_name) 

979 

980 # Set original share size as quota to new replica 

981 self.zfs('set', 'quota=%sG' % active_replica['size'], dst_dataset_name) 

982 

983 # Apply access rules from original share 

984 self._get_share_helper(new_replica['share_proto']).update_access( 

985 dst_dataset_name, access_rules, add_rules=[], delete_rules=[], 

986 make_all_ro=True) 

987 

988 return { 

989 'export_locations': self._get_share_helper( 

990 new_replica['share_proto']).create_exports(dst_dataset_name), 

991 'replica_state': constants.REPLICA_STATE_IN_SYNC, 

992 'access_rules_status': constants.STATUS_ACTIVE, 

993 } 

994 

995 @ensure_share_server_not_provided 

996 def delete_replica(self, context, replica_list, replica_snapshots, replica, 

997 share_server=None): 

998 """Deletes a replica. This is called on the destination backend.""" 

999 pool_name = self.private_storage.get(replica['id'], 'pool_name') 

1000 dataset_name = self.private_storage.get(replica['id'], 'dataset_name') 

1001 if not dataset_name: 

1002 dataset_name = self._get_dataset_name(replica) 

1003 

1004 # Delete dataset's snapshots first 

1005 out, err = self.zfs('list', '-r', '-t', 'snapshot', pool_name) 

1006 data = self.parse_zfs_answer(out) 

1007 for datum in data: 

1008 if dataset_name in datum['NAME']: 

1009 self._delete_dataset_or_snapshot_with_retry(datum['NAME']) 

1010 

1011 # Now we delete dataset itself 

1012 out, err = self.zfs('list', '-r', pool_name) 

1013 data = self.parse_zfs_answer(out) 

1014 for datum in data: 

1015 if datum['NAME'] == dataset_name: 

1016 self._get_share_helper( 

1017 replica['share_proto']).remove_exports(dataset_name) 

1018 self._delete_dataset_or_snapshot_with_retry(dataset_name) 

1019 break 

1020 else: 

1021 LOG.warning( 

1022 "Share replica with '%(id)s' ID and '%(name)s' NAME is " 

1023 "absent on backend. Nothing has been deleted.", 

1024 {'id': replica['id'], 'name': dataset_name}) 

1025 self.private_storage.delete(replica['id']) 

1026 

1027 @ensure_share_server_not_provided 

1028 def update_replica_state(self, context, replica_list, replica, 

1029 access_rules, replica_snapshots, 

1030 share_server=None): 

1031 """Syncs replica and updates its 'replica_state'.""" 

1032 return self._update_replica_state( 

1033 context, replica_list, replica, replica_snapshots, access_rules) 

1034 

1035 def _update_replica_state(self, context, replica_list, replica, 

1036 replica_snapshots=None, access_rules=None): 

1037 active_replica = self._get_active_replica(replica_list) 

1038 src_dataset_name = self.private_storage.get( 

1039 active_replica['id'], 'dataset_name') 

1040 ssh_to_src_cmd = self.private_storage.get( 

1041 active_replica['id'], 'ssh_cmd') 

1042 ssh_to_dst_cmd = self.private_storage.get( 

1043 replica['id'], 'ssh_cmd') 

1044 dst_dataset_name = self.private_storage.get( 

1045 replica['id'], 'dataset_name') 

1046 

1047 # Create temporary snapshot 

1048 previous_snapshot_tag = self.private_storage.get( 

1049 replica['id'], 'repl_snapshot_tag') 

1050 snapshot_tag = self._get_replication_snapshot_tag(replica) 

1051 src_snapshot_name = src_dataset_name + '@' + snapshot_tag 

1052 self.execute( 

1053 'ssh', ssh_to_src_cmd, 

1054 'sudo', 'zfs', 'snapshot', src_snapshot_name, 

1055 ) 

1056 

1057 # Make sure it is readonly 

1058 self.zfs('set', 'readonly=on', dst_dataset_name) 

1059 

1060 # Send/receive diff between previous snapshot and last one 

1061 out, err = self.execute( 

1062 'ssh', ssh_to_src_cmd, 

1063 'sudo', 'zfs', 'send', '-vDRI', 

1064 previous_snapshot_tag, src_snapshot_name, '|', 

1065 'ssh', ssh_to_dst_cmd, 

1066 'sudo', 'zfs', 'receive', '-vF', dst_dataset_name, 

1067 ) 

1068 msg = ("Info about last replica '%(replica_id)s' sync is following: " 

1069 "\n%(out)s") 

1070 LOG.debug(msg, {'replica_id': replica['id'], 'out': out}) 

1071 

1072 # Update DB data that will be used on following replica sync 

1073 self.private_storage.update(active_replica['id'], { 

1074 'repl_snapshot_tag': snapshot_tag, 

1075 }) 

1076 self.private_storage.update( 

1077 replica['id'], {'repl_snapshot_tag': snapshot_tag}) 

1078 

1079 # Destroy all snapshots on dst filesystem except referenced ones. 

1080 snap_references = set() 

1081 for repl in replica_list: 

1082 snap_references.add( 

1083 self.private_storage.get(repl['id'], 'repl_snapshot_tag')) 

1084 

1085 dst_pool_name = dst_dataset_name.split('/')[0] 

1086 out, err = self.zfs('list', '-r', '-t', 'snapshot', dst_pool_name) 

1087 data = self.parse_zfs_answer(out) 

1088 for datum in data: 

1089 if (dst_dataset_name in datum['NAME'] and 

1090 '@' + self.replica_snapshot_prefix in datum['NAME'] and 

1091 datum['NAME'].split('@')[-1] not in snap_references): 

1092 self._delete_dataset_or_snapshot_with_retry(datum['NAME']) 

1093 

1094 # Destroy all snapshots on src filesystem except referenced ones. 

1095 src_pool_name = src_snapshot_name.split('/')[0] 

1096 out, err = self.execute( 

1097 'ssh', ssh_to_src_cmd, 

1098 'sudo', 'zfs', 'list', '-r', '-t', 'snapshot', src_pool_name, 

1099 ) 

1100 data = self.parse_zfs_answer(out) 

1101 full_src_snapshot_prefix = ( 

1102 src_dataset_name + '@' + 

1103 self._get_replication_snapshot_prefix(replica)) 

1104 for datum in data: 

1105 if (full_src_snapshot_prefix in datum['NAME'] and 

1106 datum['NAME'].split('@')[-1] not in snap_references): 

1107 self.execute_with_retry( 

1108 'ssh', ssh_to_src_cmd, 

1109 'sudo', 'zfs', 'destroy', '-f', datum['NAME'], 

1110 ) 

1111 

1112 if access_rules: 1112 ↛ 1123line 1112 didn't jump to line 1123 because the condition on line 1112 was always true

1113 # Apply access rules from original share 

1114 # TODO(vponomaryov): we should remove somehow rules that were 

1115 # deleted on active replica after creation of secondary replica. 

1116 # For the moment there will be difference and it can be considered 

1117 # as a bug. 

1118 self._get_share_helper(replica['share_proto']).update_access( 

1119 dst_dataset_name, access_rules, add_rules=[], delete_rules=[], 

1120 make_all_ro=True) 

1121 

1122 # Return results 

1123 return constants.REPLICA_STATE_IN_SYNC 

1124 

1125 @ensure_share_server_not_provided 

1126 def promote_replica(self, context, replica_list, replica, access_rules, 

1127 share_server=None, quiesce_wait_time=None): 

1128 """Promotes secondary replica to active and active to secondary.""" 

1129 active_replica = self._get_active_replica(replica_list) 

1130 src_dataset_name = self.private_storage.get( 

1131 active_replica['id'], 'dataset_name') 

1132 ssh_to_src_cmd = self.private_storage.get( 

1133 active_replica['id'], 'ssh_cmd') 

1134 dst_dataset_name = self.private_storage.get( 

1135 replica['id'], 'dataset_name') 

1136 replica_dict = { 

1137 r['id']: { 

1138 'id': r['id'], 

1139 # NOTE(vponomaryov): access rules will be updated in next 

1140 # 'sync' operation. 

1141 'access_rules_status': constants.SHARE_INSTANCE_RULES_SYNCING, 

1142 } 

1143 for r in replica_list 

1144 } 

1145 try: 

1146 # Mark currently active replica as readonly 

1147 self.execute( 

1148 'ssh', ssh_to_src_cmd, 

1149 'set', 'readonly=on', src_dataset_name, 

1150 ) 

1151 

1152 # Create temporary snapshot of currently active replica 

1153 snapshot_tag = self._get_replication_snapshot_tag(active_replica) 

1154 src_snapshot_name = src_dataset_name + '@' + snapshot_tag 

1155 self.execute( 

1156 'ssh', ssh_to_src_cmd, 

1157 'sudo', 'zfs', 'snapshot', src_snapshot_name, 

1158 ) 

1159 

1160 # Apply temporary snapshot to all replicas 

1161 for repl in replica_list: 

1162 if repl['replica_state'] == constants.REPLICA_STATE_ACTIVE: 

1163 continue 

1164 previous_snapshot_tag = self.private_storage.get( 

1165 repl['id'], 'repl_snapshot_tag') 

1166 dataset_name = self.private_storage.get( 

1167 repl['id'], 'dataset_name') 

1168 ssh_to_dst_cmd = self.private_storage.get( 

1169 repl['id'], 'ssh_cmd') 

1170 

1171 try: 

1172 # Send/receive diff between previous snapshot and last one 

1173 out, err = self.execute( 

1174 'ssh', ssh_to_src_cmd, 

1175 'sudo', 'zfs', 'send', '-vDRI', 

1176 previous_snapshot_tag, src_snapshot_name, '|', 

1177 'ssh', ssh_to_dst_cmd, 

1178 'sudo', 'zfs', 'receive', '-vF', dataset_name, 

1179 ) 

1180 except exception.ProcessExecutionError as e: 

1181 LOG.warning("Failed to sync replica %(id)s. %(e)s", 

1182 {'id': repl['id'], 'e': e}) 

1183 replica_dict[repl['id']]['replica_state'] = ( 

1184 constants.REPLICA_STATE_OUT_OF_SYNC) 

1185 continue 

1186 

1187 msg = ("Info about last replica '%(replica_id)s' " 

1188 "sync is following: \n%(out)s") 

1189 LOG.debug(msg, {'replica_id': repl['id'], 'out': out}) 

1190 

1191 # Update latest replication snapshot for replica 

1192 self.private_storage.update( 

1193 repl['id'], {'repl_snapshot_tag': snapshot_tag}) 

1194 

1195 # Update latest replication snapshot for currently active replica 

1196 self.private_storage.update( 

1197 active_replica['id'], {'repl_snapshot_tag': snapshot_tag}) 

1198 

1199 replica_dict[active_replica['id']]['replica_state'] = ( 

1200 constants.REPLICA_STATE_IN_SYNC) 

1201 except Exception as e: 

1202 LOG.warning( 

1203 "Failed to update currently active replica. \n%s", e) 

1204 

1205 replica_dict[active_replica['id']]['replica_state'] = ( 

1206 constants.REPLICA_STATE_OUT_OF_SYNC) 

1207 

1208 # Create temporary snapshot of new replica and sync it with other 

1209 # secondary replicas. 

1210 snapshot_tag = self._get_replication_snapshot_tag(replica) 

1211 src_snapshot_name = dst_dataset_name + '@' + snapshot_tag 

1212 ssh_to_src_cmd = self.private_storage.get(replica['id'], 'ssh_cmd') 

1213 self.zfs('snapshot', src_snapshot_name) 

1214 for repl in replica_list: 

1215 if (repl['replica_state'] == constants.REPLICA_STATE_ACTIVE or 

1216 repl['id'] == replica['id']): 

1217 continue 

1218 previous_snapshot_tag = self.private_storage.get( 

1219 repl['id'], 'repl_snapshot_tag') 

1220 dataset_name = self.private_storage.get( 

1221 repl['id'], 'dataset_name') 

1222 ssh_to_dst_cmd = self.private_storage.get( 

1223 repl['id'], 'ssh_cmd') 

1224 

1225 try: 

1226 # Send/receive diff between previous snapshot and last one 

1227 out, err = self.execute( 

1228 'ssh', ssh_to_src_cmd, 

1229 'sudo', 'zfs', 'send', '-vDRI', 

1230 previous_snapshot_tag, src_snapshot_name, '|', 

1231 'ssh', ssh_to_dst_cmd, 

1232 'sudo', 'zfs', 'receive', '-vF', dataset_name, 

1233 ) 

1234 except exception.ProcessExecutionError as e: 

1235 LOG.warning("Failed to sync replica %(id)s. %(e)s", 

1236 {'id': repl['id'], 'e': e}) 

1237 replica_dict[repl['id']]['replica_state'] = ( 

1238 constants.REPLICA_STATE_OUT_OF_SYNC) 

1239 continue 

1240 

1241 msg = ("Info about last replica '%(replica_id)s' " 

1242 "sync is following: \n%(out)s") 

1243 LOG.debug(msg, {'replica_id': repl['id'], 'out': out}) 

1244 

1245 # Update latest replication snapshot for replica 

1246 self.private_storage.update( 

1247 repl['id'], {'repl_snapshot_tag': snapshot_tag}) 

1248 

1249 # Update latest replication snapshot for new active replica 

1250 self.private_storage.update( 

1251 replica['id'], {'repl_snapshot_tag': snapshot_tag}) 

1252 

1253 replica_dict[replica['id']]['replica_state'] = ( 

1254 constants.REPLICA_STATE_ACTIVE) 

1255 

1256 self._get_share_helper(replica['share_proto']).update_access( 

1257 dst_dataset_name, access_rules, add_rules=[], delete_rules=[]) 

1258 

1259 replica_dict[replica['id']]['access_rules_status'] = ( 

1260 constants.STATUS_ACTIVE) 

1261 

1262 self.zfs('set', 'readonly=off', dst_dataset_name) 

1263 

1264 return list(replica_dict.values()) 

1265 

1266 @ensure_share_server_not_provided 

1267 def create_replicated_snapshot(self, context, replica_list, 

1268 replica_snapshots, share_server=None): 

1269 """Create a snapshot and update across the replicas.""" 

1270 active_replica = self._get_active_replica(replica_list) 

1271 src_dataset_name = self.private_storage.get( 

1272 active_replica['id'], 'dataset_name') 

1273 ssh_to_src_cmd = self.private_storage.get( 

1274 active_replica['id'], 'ssh_cmd') 

1275 replica_snapshots_dict = { 

1276 si['id']: {'id': si['id']} for si in replica_snapshots} 

1277 

1278 active_snapshot_instance_id = [ 

1279 si['id'] for si in replica_snapshots 

1280 if si['share_instance_id'] == active_replica['id']][0] 

1281 snapshot_tag = self._get_snapshot_name(active_snapshot_instance_id) 

1282 # Replication should not be dependent on manually created snapshots 

1283 # so, create additional one, newer, that will be used for replication 

1284 # synchronizations. 

1285 repl_snapshot_tag = self._get_replication_snapshot_tag(active_replica) 

1286 src_snapshot_name = src_dataset_name + '@' + repl_snapshot_tag 

1287 

1288 self.private_storage.update( 

1289 replica_snapshots[0]['snapshot_id'], { 

1290 'entity_type': 'snapshot', 

1291 'snapshot_tag': snapshot_tag, 

1292 } 

1293 ) 

1294 for tag in (snapshot_tag, repl_snapshot_tag): 

1295 self.execute( 

1296 'ssh', ssh_to_src_cmd, 

1297 'sudo', 'zfs', 'snapshot', src_dataset_name + '@' + tag, 

1298 ) 

1299 

1300 # Populate snapshot to all replicas 

1301 for replica_snapshot in replica_snapshots: 

1302 replica_id = replica_snapshot['share_instance_id'] 

1303 if replica_id == active_replica['id']: 

1304 replica_snapshots_dict[replica_snapshot['id']]['status'] = ( 

1305 constants.STATUS_AVAILABLE) 

1306 continue 

1307 previous_snapshot_tag = self.private_storage.get( 

1308 replica_id, 'repl_snapshot_tag') 

1309 dst_dataset_name = self.private_storage.get( 

1310 replica_id, 'dataset_name') 

1311 ssh_to_dst_cmd = self.private_storage.get(replica_id, 'ssh_cmd') 

1312 

1313 try: 

1314 # Send/receive diff between previous snapshot and last one 

1315 out, err = self.execute( 

1316 'ssh', ssh_to_src_cmd, 

1317 'sudo', 'zfs', 'send', '-vDRI', 

1318 previous_snapshot_tag, src_snapshot_name, '|', 

1319 'ssh', ssh_to_dst_cmd, 

1320 'sudo', 'zfs', 'receive', '-vF', dst_dataset_name, 

1321 ) 

1322 except exception.ProcessExecutionError as e: 

1323 LOG.warning( 

1324 "Failed to sync snapshot instance %(id)s. %(e)s", 

1325 {'id': replica_snapshot['id'], 'e': e}) 

1326 replica_snapshots_dict[replica_snapshot['id']]['status'] = ( 

1327 constants.STATUS_ERROR) 

1328 continue 

1329 

1330 replica_snapshots_dict[replica_snapshot['id']]['status'] = ( 

1331 constants.STATUS_AVAILABLE) 

1332 

1333 msg = ("Info about last replica '%(replica_id)s' " 

1334 "sync is following: \n%(out)s") 

1335 LOG.debug(msg, {'replica_id': replica_id, 'out': out}) 

1336 

1337 # Update latest replication snapshot for replica 

1338 self.private_storage.update( 

1339 replica_id, {'repl_snapshot_tag': repl_snapshot_tag}) 

1340 

1341 # Update latest replication snapshot for currently active replica 

1342 self.private_storage.update( 

1343 active_replica['id'], {'repl_snapshot_tag': repl_snapshot_tag}) 

1344 

1345 return list(replica_snapshots_dict.values()) 

1346 

1347 @ensure_share_server_not_provided 

1348 def delete_replicated_snapshot(self, context, replica_list, 

1349 replica_snapshots, share_server=None): 

1350 """Delete a snapshot by deleting its instances across the replicas.""" 

1351 active_replica = self._get_active_replica(replica_list) 

1352 replica_snapshots_dict = { 

1353 si['id']: {'id': si['id']} for si in replica_snapshots} 

1354 

1355 for replica_snapshot in replica_snapshots: 

1356 replica_id = replica_snapshot['share_instance_id'] 

1357 snapshot_name = self._get_saved_snapshot_name(replica_snapshot) 

1358 if active_replica['id'] == replica_id: 

1359 self._delete_snapshot(context, replica_snapshot) 

1360 replica_snapshots_dict[replica_snapshot['id']]['status'] = ( 

1361 constants.STATUS_DELETED) 

1362 continue 

1363 ssh_cmd = self.private_storage.get(replica_id, 'ssh_cmd') 

1364 out, err = self.execute( 

1365 'ssh', ssh_cmd, 

1366 'sudo', 'zfs', 'list', '-r', '-t', 'snapshot', snapshot_name, 

1367 ) 

1368 data = self.parse_zfs_answer(out) 

1369 for datum in data: 

1370 if datum['NAME'] != snapshot_name: 1370 ↛ 1372line 1370 didn't jump to line 1372 because the condition on line 1370 was always true

1371 continue 

1372 self.execute_with_retry( 

1373 'ssh', ssh_cmd, 

1374 'sudo', 'zfs', 'destroy', '-f', datum['NAME'], 

1375 ) 

1376 

1377 self.private_storage.delete(replica_snapshot['id']) 

1378 replica_snapshots_dict[replica_snapshot['id']]['status'] = ( 

1379 constants.STATUS_DELETED) 

1380 

1381 self.private_storage.delete(replica_snapshot['snapshot_id']) 

1382 return list(replica_snapshots_dict.values()) 

1383 

1384 @ensure_share_server_not_provided 

1385 def update_replicated_snapshot(self, context, replica_list, 

1386 share_replica, replica_snapshots, 

1387 replica_snapshot, share_server=None): 

1388 """Update the status of a snapshot instance that lives on a replica.""" 

1389 

1390 self._update_replica_state(context, replica_list, share_replica) 

1391 

1392 snapshot_name = self._get_saved_snapshot_name(replica_snapshot) 

1393 

1394 out, err = self.zfs('list', '-r', '-t', 'snapshot', snapshot_name) 

1395 data = self.parse_zfs_answer(out) 

1396 snapshot_found = False 

1397 for datum in data: 

1398 if datum['NAME'] == snapshot_name: 

1399 snapshot_found = True 

1400 break 

1401 return_dict = {'id': replica_snapshot['id']} 

1402 if snapshot_found: 

1403 return_dict.update({'status': constants.STATUS_AVAILABLE}) 

1404 else: 

1405 return_dict.update({'status': constants.STATUS_ERROR}) 

1406 

1407 return return_dict 

1408 

1409 @ensure_share_server_not_provided 

1410 def migration_check_compatibility( 

1411 self, context, source_share, destination_share, 

1412 share_server=None, destination_share_server=None): 

1413 """Is called to test compatibility with destination backend.""" 

1414 backend_name = share_utils.extract_host( 

1415 destination_share['host'], level='backend_name') 

1416 config = get_backend_configuration(backend_name) 

1417 compatible = self.configuration.share_driver == config.share_driver 

1418 return { 

1419 'compatible': compatible, 

1420 'writable': False, 

1421 'preserve_metadata': True, 

1422 'nondisruptive': True, 

1423 } 

1424 

1425 @ensure_share_server_not_provided 

1426 def migration_start( 

1427 self, context, source_share, destination_share, source_snapshots, 

1428 snapshot_mappings, share_server=None, 

1429 destination_share_server=None): 

1430 """Is called to start share migration.""" 

1431 

1432 src_dataset_name = self.private_storage.get( 

1433 source_share['id'], 'dataset_name') 

1434 dst_dataset_name = self._get_dataset_name(destination_share) 

1435 backend_name = share_utils.extract_host( 

1436 destination_share['host'], level='backend_name') 

1437 ssh_cmd = '%(username)s@%(host)s' % { 

1438 'username': self.configuration.zfs_ssh_username, 

1439 'host': self.configuration.zfs_service_ip, 

1440 } 

1441 config = get_backend_configuration(backend_name) 

1442 remote_ssh_cmd = '%(username)s@%(host)s' % { 

1443 'username': config.zfs_ssh_username, 

1444 'host': config.zfs_service_ip, 

1445 } 

1446 snapshot_tag = self._get_migration_snapshot_tag(destination_share) 

1447 src_snapshot_name = ( 

1448 '%(dataset_name)s@%(snapshot_tag)s' % { 

1449 'snapshot_tag': snapshot_tag, 

1450 'dataset_name': src_dataset_name, 

1451 } 

1452 ) 

1453 

1454 # Save valuable data to DB 

1455 self.private_storage.update(source_share['id'], { 

1456 'migr_snapshot_tag': snapshot_tag, 

1457 }) 

1458 self.private_storage.update(destination_share['id'], { 

1459 'entity_type': 'share', 

1460 'dataset_name': dst_dataset_name, 

1461 'ssh_cmd': remote_ssh_cmd, 

1462 'pool_name': share_utils.extract_host( 

1463 destination_share['host'], level='pool'), 

1464 'migr_snapshot_tag': snapshot_tag, 

1465 }) 

1466 

1467 # Create temporary snapshot on src host. 

1468 self.execute('sudo', 'zfs', 'snapshot', src_snapshot_name) 

1469 

1470 # Send/receive temporary snapshot 

1471 cmd = ( 

1472 'ssh ' + ssh_cmd + ' ' 

1473 'sudo zfs send -vDR ' + src_snapshot_name + ' ' 

1474 '| ssh ' + remote_ssh_cmd + ' ' 

1475 'sudo zfs receive -v ' + dst_dataset_name 

1476 ) 

1477 filename = dst_dataset_name.replace('/', '_') 

1478 with utils.tempdir() as tmpdir: 

1479 tmpfilename = os.path.join(tmpdir, '%s.sh' % filename) 

1480 with open(tmpfilename, "w") as migr_script: 

1481 migr_script.write(cmd) 

1482 self.execute('sudo', 'chmod', '755', tmpfilename) 

1483 self.execute('nohup', tmpfilename, '&') 

1484 

1485 @ensure_share_server_not_provided 

1486 def migration_continue( 

1487 self, context, source_share, destination_share, source_snapshots, 

1488 snapshot_mappings, share_server=None, 

1489 destination_share_server=None): 

1490 """Is called in source share's backend to continue migration.""" 

1491 

1492 snapshot_tag = self.private_storage.get( 

1493 destination_share['id'], 'migr_snapshot_tag') 

1494 

1495 out, err = self.execute('ps', 'aux') 

1496 if not '@%s' % snapshot_tag in out: 

1497 dst_dataset_name = self.private_storage.get( 

1498 destination_share['id'], 'dataset_name') 

1499 try: 

1500 self.execute( 

1501 'sudo', 'zfs', 'get', 'quota', dst_dataset_name, 

1502 executor=self._get_shell_executor_by_host( 

1503 destination_share['host']), 

1504 ) 

1505 return True 

1506 except exception.ProcessExecutionError as e: 

1507 raise exception.ZFSonLinuxException(msg=_( 

1508 'Migration process is absent and dst dataset ' 

1509 'returned following error: %s') % e) 

1510 

1511 @ensure_share_server_not_provided 

1512 def migration_complete( 

1513 self, context, source_share, destination_share, source_snapshots, 

1514 snapshot_mappings, share_server=None, 

1515 destination_share_server=None): 

1516 """Is called to perform 2nd phase of driver migration of a given share. 

1517 

1518 """ 

1519 dst_dataset_name = self.private_storage.get( 

1520 destination_share['id'], 'dataset_name') 

1521 snapshot_tag = self.private_storage.get( 

1522 destination_share['id'], 'migr_snapshot_tag') 

1523 dst_snapshot_name = ( 

1524 '%(dataset_name)s@%(snapshot_tag)s' % { 

1525 'snapshot_tag': snapshot_tag, 

1526 'dataset_name': dst_dataset_name, 

1527 } 

1528 ) 

1529 

1530 dst_executor = self._get_shell_executor_by_host( 

1531 destination_share['host']) 

1532 

1533 # Destroy temporary migration snapshot on dst host 

1534 self.execute( 

1535 'sudo', 'zfs', 'destroy', dst_snapshot_name, 

1536 executor=dst_executor, 

1537 ) 

1538 

1539 # Get export locations of new share instance 

1540 export_locations = self._get_share_helper( 

1541 destination_share['share_proto']).create_exports( 

1542 dst_dataset_name, 

1543 executor=dst_executor) 

1544 

1545 # Destroy src share and temporary migration snapshot on src (this) host 

1546 self.delete_share(context, source_share) 

1547 

1548 return {'export_locations': export_locations} 

1549 

1550 @ensure_share_server_not_provided 

1551 def migration_cancel( 

1552 self, context, source_share, destination_share, source_snapshots, 

1553 snapshot_mappings, share_server=None, 

1554 destination_share_server=None): 

1555 """Is called to cancel driver migration.""" 

1556 

1557 src_dataset_name = self.private_storage.get( 

1558 source_share['id'], 'dataset_name') 

1559 dst_dataset_name = self.private_storage.get( 

1560 destination_share['id'], 'dataset_name') 

1561 ssh_cmd = self.private_storage.get( 

1562 destination_share['id'], 'ssh_cmd') 

1563 snapshot_tag = self.private_storage.get( 

1564 destination_share['id'], 'migr_snapshot_tag') 

1565 

1566 # Kill migration process if exists 

1567 try: 

1568 out, err = self.execute('ps', 'aux') 

1569 lines = out.split('\n') 

1570 for line in lines: 

1571 if '@%s' % snapshot_tag in line: 

1572 migr_pid = [ 

1573 x for x in line.strip().split(' ') if x != ''][1] 

1574 self.execute('sudo', 'kill', '-9', migr_pid) 

1575 except exception.ProcessExecutionError as e: 

1576 LOG.warning( 

1577 "Caught following error trying to kill migration process: %s", 

1578 e) 

1579 

1580 # Sleep couple of seconds before destroying updated objects 

1581 time.sleep(2) 

1582 

1583 # Destroy snapshot on source host 

1584 self._delete_dataset_or_snapshot_with_retry( 

1585 src_dataset_name + '@' + snapshot_tag) 

1586 

1587 # Destroy dataset and its migration snapshot on destination host 

1588 try: 

1589 self.execute( 

1590 'ssh', ssh_cmd, 

1591 'sudo', 'zfs', 'destroy', '-r', dst_dataset_name, 

1592 ) 

1593 except exception.ProcessExecutionError as e: 

1594 LOG.warning( 

1595 "Failed to destroy destination dataset with following error: " 

1596 "%s", 

1597 e) 

1598 

1599 LOG.debug( 

1600 "Migration of share with ID '%s' has been canceled.", 

1601 source_share["id"])