Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python3 

2# 

3# Copyright (C) 2020 Vates SAS - ronan.abhamon@vates.fr 

4# 

5# This program is free software: you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation, either version 3 of the License, or 

8# (at your option) any later version. 

9# This program is distributed in the hope that it will be useful, 

10# but WITHOUT ANY WARRANTY; without even the implied warranty of 

11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

12# GNU General Public License for more details. 

13# 

14# You should have received a copy of the GNU General Public License 

15# along with this program. If not, see <https://www.gnu.org/licenses/>. 

16 

17from sm_typing import List, override 

18 

19from linstorjournaler import LinstorJournaler 

20from linstorvolumemanager import LinstorVolumeManager 

21 

22from concurrent.futures import ThreadPoolExecutor 

23import base64 

24import errno 

25import json 

26import socket 

27import threading 

28import time 

29import util 

30import vhdutil 

31import xs_errors 

32 

33MANAGER_PLUGIN = 'linstor-manager' 

34 

35 

36def call_remote_method(session, host_ref, method, args): 

37 try: 

38 response = session.xenapi.host.call_plugin( 

39 host_ref, MANAGER_PLUGIN, method, args 

40 ) 

41 except Exception as e: 

42 util.SMlog('call-plugin on {} ({} with {}) exception: {}'.format( 

43 host_ref, method, args, e 

44 )) 

45 raise util.SMException(str(e)) 

46 

47 util.SMlog('call-plugin on {} ({} with {}) returned: {}'.format( 

48 host_ref, method, args, response 

49 )) 

50 

51 return response 

52 

53 

54def check_ex(path, ignoreMissingFooter = False, fast = False): 

55 cmd = [vhdutil.VHD_UTIL, "check", vhdutil.OPT_LOG_ERR, "-n", path] 

56 if ignoreMissingFooter: 

57 cmd.append("-i") 

58 if fast: 

59 cmd.append("-B") 

60 

61 vhdutil.ioretry(cmd) 

62 

63 

64class LinstorCallException(util.SMException): 

65 def __init__(self, cmd_err): 

66 self.cmd_err = cmd_err 

67 

68 @override 

69 def __str__(self) -> str: 

70 return str(self.cmd_err) 

71 

72 

73class ErofsLinstorCallException(LinstorCallException): 

74 pass 

75 

76 

77class NoPathLinstorCallException(LinstorCallException): 

78 pass 

79 

80def log_successful_call(target_host, device_path, vdi_uuid, remote_method, response): 

81 util.SMlog('Successful access on {} for device {} ({}): `{}` => {}'.format( 

82 target_host, device_path, vdi_uuid, remote_method, str(response) 

83 ), priority=util.LOG_DEBUG) 

84 

85def log_failed_call(target_host, next_target, device_path, vdi_uuid, remote_method, e): 

86 util.SMlog('Failed to call method on {} for device {} ({}): {}. Trying accessing on {}... (cause: {})'.format( 

87 target_host, device_path, vdi_uuid, remote_method, next_target, e 

88 ), priority=util.LOG_DEBUG) 

89 

90def linstorhostcall(local_method, remote_method): 

91 def decorated(response_parser): 

92 def wrapper(*args, **kwargs): 

93 self = args[0] 

94 vdi_uuid = args[1] 

95 

96 device_path = self._linstor.build_device_path( 

97 self._linstor.get_volume_name(vdi_uuid) 

98 ) 

99 

100 if not self._session: 

101 return self._call_local_method(local_method, device_path, *args[2:], **kwargs) 

102 

103 remote_args = { 

104 'devicePath': device_path, 

105 'groupName': self._linstor.group_name 

106 } 

107 remote_args.update(**kwargs) 

108 remote_args = {str(key): str(value) for key, value in remote_args.items()} 

109 

110 this_host_ref = util.get_this_host_ref(self._session) 

111 def call_method(host_label, host_ref): 

112 if host_ref == this_host_ref: 

113 return self._call_local_method(local_method, device_path, *args[2:], **kwargs) 

114 response = call_remote_method(self._session, host_ref, remote_method, remote_args) 

115 log_successful_call(host_label, device_path, vdi_uuid, remote_method, response) 

116 return response_parser(self, vdi_uuid, response) 

117 

118 # 1. Try on attached host. 

119 try: 

120 host_ref_attached = next(iter(util.get_hosts_attached_on(self._session, [vdi_uuid])), None) 

121 if host_ref_attached: 

122 return call_method('attached host', host_ref_attached) 

123 except Exception as e: 

124 log_failed_call('attached host', 'master', device_path, vdi_uuid, remote_method, e) 

125 

126 # 2. Try on master host. 

127 try: 

128 return call_method('master', util.get_master_ref(self._session)) 

129 except Exception as e: 

130 log_failed_call('master', 'primary', device_path, vdi_uuid, remote_method, e) 

131 

132 # 3. Try on a primary. 

133 hosts = self._get_hosts(remote_method, device_path) 

134 

135 nodes, primary_hostname = self._linstor.find_up_to_date_diskful_nodes(vdi_uuid) 

136 if primary_hostname: 

137 try: 

138 return call_method('primary', self._find_host_ref_from_hostname(hosts, primary_hostname)) 

139 except Exception as remote_e: 

140 self._raise_openers_exception(device_path, remote_e) 

141 

142 log_failed_call('primary', 'another node', device_path, vdi_uuid, remote_method, 'no primary') 

143 

144 # 4. Try on any host with local data. 

145 try: 

146 return call_method('another node', next(filter(None, 

147 (self._find_host_ref_from_hostname(hosts, hostname) for hostname in nodes) 

148 ), None)) 

149 except Exception as remote_e: 

150 self._raise_openers_exception(device_path, remote_e) 

151 

152 return wrapper 

153 return decorated 

154 

155 

156def linstormodifier(): 

157 def decorated(func): 

158 def wrapper(*args, **kwargs): 

159 self = args[0] 

160 

161 ret = func(*args, **kwargs) 

162 self._linstor.invalidate_resource_cache() 

163 return ret 

164 return wrapper 

165 return decorated 

166 

167 

168class LinstorVhdUtil: 

169 MAX_SIZE = 2 * 1024 * 1024 * 1024 * 1024 # Max VHD size. 

170 

171 def __init__(self, session, linstor): 

172 self._session = session 

173 self._linstor = linstor 

174 

175 def create_chain_paths(self, vdi_uuid, readonly=False): 

176 # OPTIMIZE: Add a limit_to_first_allocated_block param to limit vhdutil calls. 

177 # Useful for the snapshot code algorithm. 

178 

179 leaf_vdi_path = self._linstor.get_device_path(vdi_uuid) 

180 path = leaf_vdi_path 

181 while True: 

182 if not util.pathexists(path): 

183 raise xs_errors.XenError( 

184 'VDIUnavailable', opterr='Could not find: {}'.format(path) 

185 ) 

186 

187 # Diskless path can be created on the fly, ensure we can open it. 

188 def check_volume_usable(): 

189 while True: 

190 try: 

191 with open(path, 'r' if readonly else 'r+'): 

192 pass 

193 except IOError as e: 

194 if e.errno == errno.ENODATA: 

195 time.sleep(2) 

196 continue 

197 if e.errno == errno.EROFS or e.errno == errno.EMEDIUMTYPE: 

198 util.SMlog('Volume not attachable because used. Openers: {}'.format( 

199 self._linstor.get_volume_openers(vdi_uuid) 

200 )) 

201 raise 

202 break 

203 util.retry(check_volume_usable, 15, 2) 

204 

205 vdi_uuid = self.get_vhd_info(vdi_uuid).parentUuid 

206 if not vdi_uuid: 

207 break 

208 path = self._linstor.get_device_path(vdi_uuid) 

209 readonly = True # Non-leaf is always readonly. 

210 

211 return leaf_vdi_path 

212 

213 # -------------------------------------------------------------------------- 

214 # Getters: read locally and try on another host in case of failure. 

215 # -------------------------------------------------------------------------- 

216 

217 def check(self, vdi_uuid, ignore_missing_footer=False, fast=False): 

218 kwargs = { 

219 'ignoreMissingFooter': ignore_missing_footer, 

220 'fast': fast 

221 } 

222 try: 

223 self._check(vdi_uuid, **kwargs) 

224 return True 

225 except Exception as e: 

226 util.SMlog('Call to `check` failed: {}'.format(e)) 

227 return False 

228 

229 @linstorhostcall(check_ex, 'check') 

230 def _check(self, vdi_uuid, response): 

231 return util.strtobool(response) 

232 

233 def get_vhd_info(self, vdi_uuid, include_parent=True): 

234 kwargs = { 

235 'includeParent': include_parent, 

236 'resolveParent': False 

237 } 

238 return self._get_vhd_info(vdi_uuid, self._extract_uuid, **kwargs) 

239 

240 @linstorhostcall(vhdutil.getVHDInfo, 'getVHDInfo') 

241 def _get_vhd_info(self, vdi_uuid, response): 

242 obj = json.loads(response) 

243 

244 vhd_info = vhdutil.VHDInfo(vdi_uuid) 

245 vhd_info.sizeVirt = obj['sizeVirt'] 

246 vhd_info.sizePhys = obj['sizePhys'] 

247 if 'parentPath' in obj: 

248 vhd_info.parentPath = obj['parentPath'] 

249 vhd_info.parentUuid = obj['parentUuid'] 

250 vhd_info.hidden = obj['hidden'] 

251 vhd_info.path = obj['path'] 

252 

253 return vhd_info 

254 

255 @linstorhostcall(vhdutil.hasParent, 'hasParent') 

256 def has_parent(self, vdi_uuid, response): 

257 return util.strtobool(response) 

258 

259 def get_parent(self, vdi_uuid): 

260 return self._get_parent(vdi_uuid, self._extract_uuid) 

261 

262 @linstorhostcall(vhdutil.getParent, 'getParent') 

263 def _get_parent(self, vdi_uuid, response): 

264 return response 

265 

266 @linstorhostcall(vhdutil.getSizeVirt, 'getSizeVirt') 

267 def get_size_virt(self, vdi_uuid, response): 

268 return int(response) 

269 

270 @linstorhostcall(vhdutil.getMaxResizeSize, 'getMaxResizeSize') 

271 def get_max_resize_size(self, vdi_uuid, response): 

272 return int(response) 

273 

274 @linstorhostcall(vhdutil.getSizePhys, 'getSizePhys') 

275 def get_size_phys(self, vdi_uuid, response): 

276 return int(response) 

277 

278 @linstorhostcall(vhdutil.getAllocatedSize, 'getAllocatedSize') 

279 def get_allocated_size(self, vdi_uuid, response): 

280 return int(response) 

281 

282 @linstorhostcall(vhdutil.getDepth, 'getDepth') 

283 def get_depth(self, vdi_uuid, response): 

284 return int(response) 

285 

286 @linstorhostcall(vhdutil.getKeyHash, 'getKeyHash') 

287 def get_key_hash(self, vdi_uuid, response): 

288 return response or None 

289 

290 @linstorhostcall(vhdutil.getBlockBitmap, 'getBlockBitmap') 

291 def get_block_bitmap(self, vdi_uuid, response): 

292 return base64.b64decode(response) 

293 

294 @linstorhostcall('_get_drbd_size', 'getDrbdSize') 

295 def get_drbd_size(self, vdi_uuid, response): 

296 return int(response) 

297 

298 def _get_drbd_size(self, path): 

299 (ret, stdout, stderr) = util.doexec(['blockdev', '--getsize64', path]) 

300 if ret == 0: 

301 return int(stdout.strip()) 

302 raise util.SMException('Failed to get DRBD size: {}'.format(stderr)) 

303 

304 # -------------------------------------------------------------------------- 

305 # Setters: only used locally. 

306 # -------------------------------------------------------------------------- 

307 

308 @linstormodifier() 

309 def create(self, path, size, static, msize=0): 

310 return self._call_local_method_or_fail(vhdutil.create, path, size, static, msize) 

311 

312 @linstormodifier() 

313 def set_size_phys(self, path, size, debug=True): 

314 return self._call_local_method_or_fail(vhdutil.setSizePhys, path, size, debug) 

315 

316 @linstormodifier() 

317 def set_parent(self, path, parentPath, parentRaw=False): 

318 return self._call_local_method_or_fail(vhdutil.setParent, path, parentPath, parentRaw) 

319 

320 @linstormodifier() 

321 def set_hidden(self, path, hidden=True): 

322 return self._call_local_method_or_fail(vhdutil.setHidden, path, hidden) 

323 

324 @linstormodifier() 

325 def set_key(self, path, key_hash): 

326 return self._call_local_method_or_fail(vhdutil.setKey, path, key_hash) 

327 

328 @linstormodifier() 

329 def kill_data(self, path): 

330 return self._call_local_method_or_fail(vhdutil.killData, path) 

331 

332 @linstormodifier() 

333 def snapshot(self, path, parent, parentRaw, msize=0, checkEmpty=True): 

334 return self._call_local_method_or_fail(vhdutil.snapshot, path, parent, parentRaw, msize, checkEmpty) 

335 

336 def inflate(self, journaler, vdi_uuid, vdi_path, new_size, old_size): 

337 # Only inflate if the LINSTOR volume capacity is not enough. 

338 new_size = LinstorVolumeManager.round_up_volume_size(new_size) 

339 if new_size <= old_size: 

340 return 

341 

342 util.SMlog( 

343 'Inflate {} (size={}, previous={})' 

344 .format(vdi_path, new_size, old_size) 

345 ) 

346 

347 journaler.create( 

348 LinstorJournaler.INFLATE, vdi_uuid, old_size 

349 ) 

350 self._linstor.resize_volume(vdi_uuid, new_size) 

351 

352 result_size = self.get_drbd_size(vdi_uuid) 

353 if result_size < new_size: 

354 util.SMlog( 

355 'WARNING: Cannot inflate volume to {}B, result size: {}B' 

356 .format(new_size, result_size) 

357 ) 

358 

359 self._zeroize(vdi_path, result_size - vhdutil.VHD_FOOTER_SIZE) 

360 self.set_size_phys(vdi_path, result_size, False) 

361 journaler.remove(LinstorJournaler.INFLATE, vdi_uuid) 

362 

363 def deflate(self, vdi_path, new_size, old_size, zeroize=False): 

364 if zeroize: 

365 assert old_size > vhdutil.VHD_FOOTER_SIZE 

366 self._zeroize(vdi_path, old_size - vhdutil.VHD_FOOTER_SIZE) 

367 

368 new_size = LinstorVolumeManager.round_up_volume_size(new_size) 

369 if new_size >= old_size: 

370 return 

371 

372 util.SMlog( 

373 'Deflate {} (new size={}, previous={})' 

374 .format(vdi_path, new_size, old_size) 

375 ) 

376 

377 self.set_size_phys(vdi_path, new_size) 

378 # TODO: Change the LINSTOR volume size using linstor.resize_volume. 

379 

380 # -------------------------------------------------------------------------- 

381 # Remote setters: write locally and try on another host in case of failure. 

382 # -------------------------------------------------------------------------- 

383 

384 @linstormodifier() 

385 def set_size_virt(self, path, size, jfile): 

386 kwargs = { 

387 'size': size, 

388 'jfile': jfile 

389 } 

390 return self._call_method(vhdutil.setSizeVirt, 'setSizeVirt', path, use_parent=False, **kwargs) 

391 

392 @linstormodifier() 

393 def set_size_virt_fast(self, path, size): 

394 kwargs = { 

395 'size': size 

396 } 

397 return self._call_method(vhdutil.setSizeVirtFast, 'setSizeVirtFast', path, use_parent=False, **kwargs) 

398 

399 @linstormodifier() 

400 def force_parent(self, path, parentPath, parentRaw=False): 

401 kwargs = { 

402 'parentPath': str(parentPath), 

403 'parentRaw': parentRaw 

404 } 

405 return self._call_method(vhdutil.setParent, 'setParent', path, use_parent=False, **kwargs) 

406 

407 @linstormodifier() 

408 def force_coalesce(self, path): 

409 return int(self._call_method(vhdutil.coalesce, 'coalesce', path, use_parent=True)) 

410 

411 @linstormodifier() 

412 def force_repair(self, path): 

413 return self._call_method(vhdutil.repair, 'repair', path, use_parent=False) 

414 

415 @linstormodifier() 

416 def force_deflate(self, path, newSize, oldSize, zeroize): 

417 kwargs = { 

418 'newSize': newSize, 

419 'oldSize': oldSize, 

420 'zeroize': zeroize 

421 } 

422 return self._call_method('_force_deflate', 'deflate', path, use_parent=False, **kwargs) 

423 

424 def _force_deflate(self, path, newSize, oldSize, zeroize): 

425 self.deflate(path, newSize, oldSize, zeroize) 

426 

427 # -------------------------------------------------------------------------- 

428 # Static helpers. 

429 # -------------------------------------------------------------------------- 

430 

431 @classmethod 

432 def compute_volume_size(cls, virtual_size, image_type): 

433 if image_type == vhdutil.VDI_TYPE_VHD: 

434 # All LINSTOR VDIs have the metadata area preallocated for 

435 # the maximum possible virtual size (for fast online VDI.resize). 

436 meta_overhead = vhdutil.calcOverheadEmpty(cls.MAX_SIZE) 

437 bitmap_overhead = vhdutil.calcOverheadBitmap(virtual_size) 

438 virtual_size += meta_overhead + bitmap_overhead 

439 elif image_type != vhdutil.VDI_TYPE_RAW: 

440 raise Exception('Invalid image type: {}'.format(image_type)) 

441 

442 return LinstorVolumeManager.round_up_volume_size(virtual_size) 

443 

444 # -------------------------------------------------------------------------- 

445 # Helpers. 

446 # -------------------------------------------------------------------------- 

447 

448 def _extract_uuid(self, device_path): 

449 # TODO: Remove new line in the vhdutil module. Not here. 

450 return self._linstor.get_volume_uuid_from_device_path( 

451 device_path.rstrip('\n') 

452 ) 

453 

454 def _get_hosts(self, remote_method, device_path): 

455 try: 

456 return self._session.xenapi.host.get_all_records() 

457 except Exception as e: 

458 raise xs_errors.XenError( 

459 'VDIUnavailable', 

460 opterr='Unable to get host list to run vhdutil command `{}` (path={}): {}' 

461 .format(remote_method, device_path, e) 

462 ) 

463 

464 # -------------------------------------------------------------------------- 

465 

466 @staticmethod 

467 def _find_host_ref_from_hostname(hosts, hostname): 

468 return next((ref for ref, rec in hosts.items() if rec['hostname'] == hostname), None) 

469 

470 def _raise_openers_exception(self, device_path, e): 

471 if isinstance(e, util.CommandException): 

472 e_str = 'cmd: `{}`, code: `{}`, reason: `{}`'.format(e.cmd, e.code, e.reason) 

473 else: 

474 e_str = str(e) 

475 

476 try: 

477 volume_uuid = self._linstor.get_volume_uuid_from_device_path( 

478 device_path 

479 ) 

480 e_wrapper = Exception( 

481 e_str + ' (openers: {})'.format( 

482 self._linstor.get_volume_openers(volume_uuid) 

483 ) 

484 ) 

485 except Exception as illformed_e: 

486 e_wrapper = Exception( 

487 e_str + ' (unable to get openers: {})'.format(illformed_e) 

488 ) 

489 util.SMlog('raise opener exception: {}'.format(e_wrapper)) 

490 raise e_wrapper # pylint: disable = E0702 

491 

492 def _call_local_method(self, local_method, device_path, *args, **kwargs): 

493 if isinstance(local_method, str): 

494 local_method = getattr(self, local_method) 

495 

496 try: 

497 def local_call(): 

498 try: 

499 return local_method(device_path, *args, **kwargs) 

500 except util.CommandException as e: 

501 if e.code == errno.EROFS or e.code == errno.EMEDIUMTYPE: 

502 raise ErofsLinstorCallException(e) # Break retry calls. 

503 if e.code == errno.ENOENT: 

504 raise NoPathLinstorCallException(e) 

505 raise e 

506 # Retry only locally if it's not an EROFS exception. 

507 return util.retry(local_call, 5, 2, exceptions=[util.CommandException]) 

508 except util.CommandException as e: 

509 util.SMlog('failed to execute locally vhd-util (sys {})'.format(e.code)) 

510 raise e 

511 

512 def _call_local_method_or_fail(self, local_method, device_path, *args, **kwargs): 

513 try: 

514 return self._call_local_method(local_method, device_path, *args, **kwargs) 

515 except ErofsLinstorCallException as e: 

516 # Volume is locked on a host, find openers. 

517 self._raise_openers_exception(device_path, e.cmd_err) 

518 

519 def _call_method(self, local_method, remote_method, device_path, use_parent, *args, **kwargs): 

520 # Note: `use_parent` exists to know if the VHD parent is used by the local/remote method. 

521 # Normally in case of failure, if the parent is unused we try to execute the method on 

522 # another host using the DRBD opener list. In the other case, if the parent is required, 

523 # we must check where this last one is open instead of the child. 

524 

525 if isinstance(local_method, str): 

526 local_method = getattr(self, local_method) 

527 

528 # A. Try to write locally... 

529 try: 

530 return self._call_local_method(local_method, device_path, *args, **kwargs) 

531 except Exception: 

532 pass 

533 

534 util.SMlog('unable to execute `{}` locally, retry using a writable host...'.format(remote_method)) 

535 

536 # B. Execute the command on another host. 

537 # B.1. Get host list. 

538 hosts = self._get_hosts(remote_method, device_path) 

539 

540 # B.2. Prepare remote args. 

541 remote_args = { 

542 'devicePath': device_path, 

543 'groupName': self._linstor.group_name 

544 } 

545 remote_args.update(**kwargs) 

546 remote_args = {str(key): str(value) for key, value in remote_args.items()} 

547 

548 volume_uuid = self._linstor.get_volume_uuid_from_device_path( 

549 device_path 

550 ) 

551 parent_volume_uuid = None 

552 if use_parent: 

553 parent_volume_uuid = self.get_parent(volume_uuid) 

554 

555 openers_uuid = parent_volume_uuid if use_parent else volume_uuid 

556 

557 # B.3. Call! 

558 def remote_call(): 

559 try: 

560 all_openers = self._linstor.get_volume_openers(openers_uuid) 

561 except Exception as e: 

562 raise xs_errors.XenError( 

563 'VDIUnavailable', 

564 opterr='Unable to get DRBD openers to run vhd-util command `{}` (path={}): {}' 

565 .format(remote_method, device_path, e) 

566 ) 

567 

568 no_host_found = True 

569 for hostname, openers in all_openers.items(): 

570 if not openers: 

571 continue 

572 

573 host_ref = self._find_host_ref_from_hostname(hosts, hostname) 

574 if not host_ref: 

575 continue 

576 

577 no_host_found = False 

578 try: 

579 return call_remote_method(self._session, host_ref, remote_method, remote_args) 

580 except Exception: 

581 pass 

582 

583 if no_host_found: 

584 try: 

585 return local_method(device_path, *args, **kwargs) 

586 except Exception as e: 

587 self._raise_openers_exception(device_path, e) 

588 

589 raise xs_errors.XenError( 

590 'VDIUnavailable', 

591 opterr='No valid host found to run vhd-util command `{}` (path=`{}`, openers=`{}`)' 

592 .format(remote_method, device_path, openers) 

593 ) 

594 return util.retry(remote_call, 5, 2) 

595 

596 @staticmethod 

597 def _zeroize(path, size): 

598 if not util.zeroOut(path, size, vhdutil.VHD_FOOTER_SIZE): 

599 raise xs_errors.XenError( 

600 'EIO', 

601 opterr='Failed to zero out VHD footer {}'.format(path) 

602 ) 

603 

604class MultiLinstorVhdUtil: 

605 class ExecutorData(threading.local): 

606 def __init__(self): 

607 self.clear() 

608 

609 def clear(self): 

610 self.session = None 

611 self.linstor = None 

612 self.vhdutil = None 

613 

614 class Load: 

615 def __init__(self, session): 

616 self.session = session 

617 

618 def cleanup(self): 

619 if self.session: 

620 self.session.xenapi.session.logout() 

621 self.session = None 

622 

623 def __init__(self, uri, group_name) -> None: 

624 self._uri = uri 

625 self._group_name = group_name 

626 self._loads: List[MultiLinstorVhdUtil.Load] = [] 

627 self._executor_data = self.ExecutorData() 

628 

629 def __del__(self): 

630 self._cleanup() 

631 

632 def run(self, func, vdi_uuids): 

633 def wrapper(func, vdi_uuid): 

634 if not self._executor_data.session: 

635 self._init_executor_thread() 

636 return func(vdi_uuid, self._executor_data.vhdutil) 

637 

638 with ThreadPoolExecutor(thread_name_prefix="VhdUtil") as executor: 

639 return executor.map(lambda vdi_uuid: wrapper(func, vdi_uuid), vdi_uuids) 

640 

641 @property 

642 def local_vhdutil(self): 

643 return self._executor_data.vhdutil 

644 

645 def _init_executor_thread(self): 

646 session = util.get_localAPI_session() 

647 load = self.Load(session) 

648 try: 

649 linstor = LinstorVolumeManager( 

650 self._uri, 

651 self._group_name, 

652 repair=False, 

653 logger=util.SMlog 

654 ) 

655 self._executor_data.linstor = linstor 

656 self._executor_data.vhdutil = LinstorVhdUtil(session, linstor) 

657 self._executor_data.session = session 

658 except: 

659 self._executor_data.clear() 

660 load.cleanup() 

661 raise 

662 

663 self._loads.append(load) 

664 

665 def _cleanup(self): 

666 for load in self._loads: 

667 try: 

668 load.cleanup() 

669 except Exception as e: 

670 util.SMlog(f"Failed to clean load executor: {e}") 

671 self._loads.clear()