Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python3 

2# 

3# Copyright (C) 2024 Vates SAS 

4# 

5# This program is free software: you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation, either version 3 of the License, or 

8# (at your option) any later version. 

9# This program is distributed in the hope that it will be useful, 

10# but WITHOUT ANY WARRANTY; without even the implied warranty of 

11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

12# GNU General Public License for more details. 

13# 

14# You should have received a copy of the GNU General Public License 

15# along with this program. If not, see <https://www.gnu.org/licenses/>. 

16 

17from sm_typing import Any, Callable, Dict, Final, List, Optional, Sequence, Union 

18 

19from abc import ABC, abstractmethod 

20from enum import IntEnum 

21 

22import errno 

23import time 

24 

25import util 

26 

27from vditype import VdiType 

28 

29# ------------------------------------------------------------------------------ 

30 

31IMAGE_FORMAT_COW_FLAG: Final = 1 << 8 

32 

33class ImageFormat(IntEnum): 

34 RAW = 1 

35 VHD = 2 | IMAGE_FORMAT_COW_FLAG 

36 QCOW2 = 3 | IMAGE_FORMAT_COW_FLAG 

37 

38IMAGE_FORMAT_TO_STR: Final = { 

39 ImageFormat.RAW: "raw", 

40 ImageFormat.VHD: "vhd", 

41 ImageFormat.QCOW2: "qcow2" 

42} 

43 

44STR_TO_IMAGE_FORMAT: Final = {v: k for k, v in IMAGE_FORMAT_TO_STR.items()} 

45 

46# ------------------------------------------------------------------------------ 

47 

48def parseImageFormats(str_formats: Optional[str], default_formats: List[ImageFormat]) -> List[ImageFormat]: 

49 if not str_formats: 49 ↛ 52line 49 didn't jump to line 52, because the condition on line 49 was never false

50 return default_formats 

51 

52 entries = [entry.strip() for entry in str_formats.split(",")] 

53 

54 image_formats: List[ImageFormat] = [] 

55 for entry in entries: 

56 image_format = STR_TO_IMAGE_FORMAT.get(entry) 

57 if image_format and image_format not in image_formats: 

58 image_formats.append(image_format) 

59 

60 if image_formats: 

61 return image_formats 

62 

63 return default_formats 

64 

65# ------------------------------------------------------------------------------ 

66 

67class CowImageInfo(object): 

68 uuid = "" 

69 path = "" 

70 sizeVirt = -1 

71 sizePhys = -1 

72 sizeAllocated = -1 

73 hidden = False 

74 parentUuid = "" 

75 parentPath = "" 

76 error: Any = 0 

77 

78 def __init__(self, uuid): 

79 self.uuid = uuid 

80 

81# ------------------------------------------------------------------------------ 

82 

83class CowUtil(ABC): 

84 class CheckResult(IntEnum): 

85 Success = 0 

86 Fail = 1 

87 Unavailable = 2 

88 

89 @abstractmethod 

90 def getMinImageSize(self) -> int: 

91 pass 

92 

93 @abstractmethod 

94 def getMaxImageSize(self) -> int: 

95 pass 

96 

97 @abstractmethod 

98 def getBlockSize(self, path: str) -> int: 

99 pass 

100 

101 @abstractmethod 

102 def getFooterSize(self) -> int: 

103 pass 

104 

105 @abstractmethod 

106 def getDefaultPreallocationSizeVirt(self) -> int: 

107 pass 

108 

109 @abstractmethod 

110 def getMaxChainLength(self) -> int: 

111 pass 

112 

113 @abstractmethod 

114 def calcOverheadEmpty(self, virtual_size: int, block_size: Optional[int] = None) -> int: 

115 pass 

116 

117 @abstractmethod 

118 def calcOverheadBitmap(self, virtual_size: int) -> int: 

119 pass 

120 

121 @abstractmethod 

122 def getInfo( 

123 self, 

124 path: str, 

125 extractUuidFunction: Callable[[str], str], 

126 includeParent: bool = True, 

127 resolveParent: bool = True, 

128 useBackupFooter: bool = False 

129 ) -> CowImageInfo: 

130 pass 

131 

132 @abstractmethod 

133 def getInfoFromLVM( 

134 self, lvName: str, extractUuidFunction: Callable[[str], str], vgName: str 

135 ) -> Optional[CowImageInfo]: 

136 pass 

137 

138 @abstractmethod 

139 def getAllInfoFromVG( 

140 self, 

141 pattern: str, 

142 extractUuidFunction: Callable[[str], str], 

143 vgName: Optional[str] = None, 

144 parents: bool = False, 

145 exitOnError: bool = False 

146 ) -> Dict[str, CowImageInfo]: 

147 pass 

148 

149 @abstractmethod 

150 def getParent(self, path: str, extractUuidFunction: Callable[[str], str]) -> Optional[str]: 

151 pass 

152 

153 @abstractmethod 

154 def getParentNoCheck(self, path: str) -> Optional[str]: 

155 pass 

156 

157 @abstractmethod 

158 def hasParent(self, path: str) -> bool: 

159 pass 

160 

161 @abstractmethod 

162 def setParent(self, path: str, parentPath: str, parentRaw: bool) -> None: 

163 pass 

164 

165 @abstractmethod 

166 def getHidden(self, path: str) -> bool: 

167 pass 

168 

169 @abstractmethod 

170 def setHidden(self, path: str, hidden: bool = True) -> None: 

171 pass 

172 

173 @abstractmethod 

174 def getSizeVirt(self, path: str) -> int: 

175 pass 

176 

177 @abstractmethod 

178 def setSizeVirt(self, path: str, size: int, jFile: str) -> None: 

179 pass 

180 

181 @abstractmethod 

182 def setSizeVirtFast(self, path: str, size: int) -> None: 

183 pass 

184 

185 @abstractmethod 

186 def getMaxResizeSize(self, path: str) -> int: 

187 pass 

188 

189 @abstractmethod 

190 def getSizePhys(self, path: str) -> int: 

191 pass 

192 

193 @abstractmethod 

194 def setSizePhys(self, path: str, size: int, debug: bool = True) -> None: 

195 pass 

196 

197 @abstractmethod 

198 def getAllocatedSize(self, path: str) -> int: 

199 pass 

200 

201 @abstractmethod 

202 def getResizeJournalSize(self) -> int: 

203 pass 

204 

205 @abstractmethod 

206 def killData(self, path: str) -> None: 

207 pass 

208 

209 @abstractmethod 

210 def getDepth(self, path: str) -> int: 

211 pass 

212 

213 @abstractmethod 

214 def getBlockBitmap(self, path: str) -> bytes: 

215 pass 

216 

217 @abstractmethod 

218 def coalesce(self, path: str) -> int: 

219 pass 

220 

221 @abstractmethod 

222 def create(self, path: str, size: int, static: bool, msize: int = 0, block_size: Optional[int] = None) -> None: 

223 pass 

224 

225 @abstractmethod 

226 def snapshot( 

227 self, 

228 path: str, 

229 parent: str, 

230 parentRaw: bool, 

231 msize: int = 0, 

232 checkEmpty: bool = True, 

233 is_mirror_image: bool = False 

234 ) -> None: 

235 pass 

236 

237 @abstractmethod 

238 def canSnapshotRaw(self, size: int) -> bool: 

239 pass 

240 

241 @abstractmethod 

242 def check( 

243 self, 

244 path: str, 

245 ignoreMissingFooter: bool = False, 

246 fast: bool = False 

247 ) -> 'CowUtil.CheckResult': 

248 pass 

249 

250 @abstractmethod 

251 def revert(self, path: str, jFile: str) -> None: 

252 pass 

253 

254 @abstractmethod 

255 def repair(self, path: str) -> None: 

256 pass 

257 

258 @abstractmethod 

259 def validateAndRoundImageSize(self, size: int) -> int: 

260 pass 

261 

262 @abstractmethod 

263 def getKeyHash(self, path: str) -> Optional[str]: 

264 pass 

265 

266 @abstractmethod 

267 def setKey(self, path: str, key_hash: str) -> None: 

268 pass 

269 

270 # The availability of coalesceOnline and cancelCoalesceOnline are dependent on isCoalesceableOnRemote() returning True 

271 # If not, both function should raise NotImplementedError 

272 @abstractmethod 

273 def isCoalesceableOnRemote(self) -> bool: 

274 pass 

275 

276 @abstractmethod 

277 def coalesceOnline(self, path: str) -> int: 

278 pass 

279 

280 @abstractmethod 

281 def cancelCoalesceOnline(self, path: str) -> None: 

282 pass 

283 

284 def getParentChain(self, lvName: str, extractUuidFunction: Callable[[str], str], vgName: str) -> Dict[str, str]: 

285 """ 

286 Get the chain of all parents of 'path'. Safe to call for raw VDI's as well. 

287 """ 

288 chain = {} 

289 vdis: Dict[str, CowImageInfo] = {} 

290 retries = 0 

291 while (not vdis): 

292 if retries > 60: 292 ↛ 293line 292 didn't jump to line 293, because the condition on line 292 was never true

293 util.SMlog('ERROR: getAllInfoFromVG returned 0 VDIs after %d retries' % retries) 

294 util.SMlog('ERROR: the image metadata might be corrupted') 

295 break 

296 vdis = self.getAllInfoFromVG(lvName, extractUuidFunction, vgName, True, True) 

297 if (not vdis): 297 ↛ 298line 297 didn't jump to line 298, because the condition on line 297 was never true

298 retries = retries + 1 

299 time.sleep(1) 

300 for uuid, vdi in vdis.items(): 300 ↛ 301line 300 didn't jump to line 301, because the loop on line 300 never started

301 chain[uuid] = vdi.path 

302 #util.SMlog("Parent chain for %s: %s" % (lvName, chain)) 

303 return chain 

304 

305 @staticmethod 

306 def isCowImage(image_format: ImageFormat) -> bool: 

307 return bool(image_format & IMAGE_FORMAT_COW_FLAG) 

308 

309 @staticmethod 

310 def _ioretry(cmd: Sequence[str], text: bool = True) -> Union[str, bytes]: 

311 return util.ioretry( 

312 lambda: util.pread2(cmd, text=text), 

313 errlist=[errno.EIO, errno.EAGAIN] 

314 ) 

315 

316# ------------------------------------------------------------------------------ 

317 

318def getImageFormatFromVdiType(vdi_type: str) -> ImageFormat: 

319 if vdi_type == VdiType.RAW: 

320 return ImageFormat.RAW 

321 if vdi_type == VdiType.VHD: 

322 return ImageFormat.VHD 

323 if vdi_type == VdiType.QCOW2: 323 ↛ 326line 323 didn't jump to line 326, because the condition on line 323 was never false

324 return ImageFormat.QCOW2 

325 

326 assert False, f"Unsupported vdi type: {vdi_type}" 

327 

328def getImageStringFromVdiType(vdi_type: str) -> str: 

329 return IMAGE_FORMAT_TO_STR[getImageFormatFromVdiType(vdi_type)] 

330 

331def getVdiTypeFromImageFormat(image_format: ImageFormat) -> str: 

332 if image_format == ImageFormat.RAW: 

333 return VdiType.RAW 

334 if image_format == ImageFormat.VHD: 

335 return VdiType.VHD 

336 if image_format == ImageFormat.QCOW2: 

337 return VdiType.QCOW2 

338 

339 assert False, f"Unsupported image format: {IMAGE_FORMAT_TO_STR[image_format]}" 

340 

341# ------------------------------------------------------------------------------ 

342 

343def getCowUtilFromImageFormat(image_format: ImageFormat) -> CowUtil: 

344 import vhdutil 

345 import qcow2util 

346 

347 if image_format in (ImageFormat.RAW, ImageFormat.VHD): 

348 return vhdutil.VhdUtil() 

349 

350 if image_format == ImageFormat.QCOW2: 350 ↛ 353line 350 didn't jump to line 353, because the condition on line 350 was never false

351 return qcow2util.QCowUtil() 

352 

353 assert False, f"Unsupported image format: {image_format}" 

354 

355def getCowUtil(vdi_type: str) -> CowUtil: 

356 return getCowUtilFromImageFormat(getImageFormatFromVdiType(vdi_type))