Coverage for drivers/cowutil.py : 66%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python3
2#
3# Copyright (C) 2024 Vates SAS
4#
5# This program is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# (at your option) any later version.
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program. If not, see <https://www.gnu.org/licenses/>.
17from sm_typing import Any, Callable, Dict, Final, List, Optional, Sequence, Union
19from abc import ABC, abstractmethod
20from enum import IntEnum
22import errno
23import time
25import util
27from vditype import VdiType
29# ------------------------------------------------------------------------------
31IMAGE_FORMAT_COW_FLAG: Final = 1 << 8
33class ImageFormat(IntEnum):
34 RAW = 1
35 VHD = 2 | IMAGE_FORMAT_COW_FLAG
36 QCOW2 = 3 | IMAGE_FORMAT_COW_FLAG
38IMAGE_FORMAT_TO_STR: Final = {
39 ImageFormat.RAW: "raw",
40 ImageFormat.VHD: "vhd",
41 ImageFormat.QCOW2: "qcow2"
42}
44STR_TO_IMAGE_FORMAT: Final = {v: k for k, v in IMAGE_FORMAT_TO_STR.items()}
46# ------------------------------------------------------------------------------
48def parseImageFormats(str_formats: Optional[str], default_formats: List[ImageFormat]) -> List[ImageFormat]:
49 if not str_formats: 49 ↛ 52line 49 didn't jump to line 52, because the condition on line 49 was never false
50 return default_formats
52 entries = [entry.strip() for entry in str_formats.split(",")]
54 image_formats: List[ImageFormat] = []
55 for entry in entries:
56 image_format = STR_TO_IMAGE_FORMAT.get(entry)
57 if image_format and image_format not in image_formats:
58 image_formats.append(image_format)
60 if image_formats:
61 return image_formats
63 return default_formats
65# ------------------------------------------------------------------------------
67class CowImageInfo(object):
68 uuid = ""
69 path = ""
70 sizeVirt = -1
71 sizePhys = -1
72 sizeAllocated = -1
73 hidden = False
74 parentUuid = ""
75 parentPath = ""
76 error: Any = 0
78 def __init__(self, uuid):
79 self.uuid = uuid
81# ------------------------------------------------------------------------------
83class CowUtil(ABC):
84 class CheckResult(IntEnum):
85 Success = 0
86 Fail = 1
87 Unavailable = 2
89 @abstractmethod
90 def getMinImageSize(self) -> int:
91 pass
93 @abstractmethod
94 def getMaxImageSize(self) -> int:
95 pass
97 @abstractmethod
98 def getBlockSize(self, path: str) -> int:
99 pass
101 @abstractmethod
102 def getFooterSize(self) -> int:
103 pass
105 @abstractmethod
106 def getDefaultPreallocationSizeVirt(self) -> int:
107 pass
109 @abstractmethod
110 def getMaxChainLength(self) -> int:
111 pass
113 @abstractmethod
114 def calcOverheadEmpty(self, virtual_size: int, block_size: Optional[int] = None) -> int:
115 pass
117 @abstractmethod
118 def calcOverheadBitmap(self, virtual_size: int) -> int:
119 pass
121 @abstractmethod
122 def getInfo(
123 self,
124 path: str,
125 extractUuidFunction: Callable[[str], str],
126 includeParent: bool = True,
127 resolveParent: bool = True,
128 useBackupFooter: bool = False
129 ) -> CowImageInfo:
130 pass
132 @abstractmethod
133 def getInfoFromLVM(
134 self, lvName: str, extractUuidFunction: Callable[[str], str], vgName: str
135 ) -> Optional[CowImageInfo]:
136 pass
138 @abstractmethod
139 def getAllInfoFromVG(
140 self,
141 pattern: str,
142 extractUuidFunction: Callable[[str], str],
143 vgName: Optional[str] = None,
144 parents: bool = False,
145 exitOnError: bool = False
146 ) -> Dict[str, CowImageInfo]:
147 pass
149 @abstractmethod
150 def getParent(self, path: str, extractUuidFunction: Callable[[str], str]) -> Optional[str]:
151 pass
153 @abstractmethod
154 def getParentNoCheck(self, path: str) -> Optional[str]:
155 pass
157 @abstractmethod
158 def hasParent(self, path: str) -> bool:
159 pass
161 @abstractmethod
162 def setParent(self, path: str, parentPath: str, parentRaw: bool) -> None:
163 pass
165 @abstractmethod
166 def getHidden(self, path: str) -> bool:
167 pass
169 @abstractmethod
170 def setHidden(self, path: str, hidden: bool = True) -> None:
171 pass
173 @abstractmethod
174 def getSizeVirt(self, path: str) -> int:
175 pass
177 @abstractmethod
178 def setSizeVirt(self, path: str, size: int, jFile: str) -> None:
179 pass
181 @abstractmethod
182 def setSizeVirtFast(self, path: str, size: int) -> None:
183 pass
185 @abstractmethod
186 def getMaxResizeSize(self, path: str) -> int:
187 pass
189 @abstractmethod
190 def getSizePhys(self, path: str) -> int:
191 pass
193 @abstractmethod
194 def setSizePhys(self, path: str, size: int, debug: bool = True) -> None:
195 pass
197 @abstractmethod
198 def getAllocatedSize(self, path: str) -> int:
199 pass
201 @abstractmethod
202 def getResizeJournalSize(self) -> int:
203 pass
205 @abstractmethod
206 def killData(self, path: str) -> None:
207 pass
209 @abstractmethod
210 def getDepth(self, path: str) -> int:
211 pass
213 @abstractmethod
214 def getBlockBitmap(self, path: str) -> bytes:
215 pass
217 @abstractmethod
218 def coalesce(self, path: str) -> int:
219 pass
221 @abstractmethod
222 def create(self, path: str, size: int, static: bool, msize: int = 0, block_size: Optional[int] = None) -> None:
223 pass
225 @abstractmethod
226 def snapshot(
227 self,
228 path: str,
229 parent: str,
230 parentRaw: bool,
231 msize: int = 0,
232 checkEmpty: bool = True,
233 is_mirror_image: bool = False
234 ) -> None:
235 pass
237 @abstractmethod
238 def canSnapshotRaw(self, size: int) -> bool:
239 pass
241 @abstractmethod
242 def check(
243 self,
244 path: str,
245 ignoreMissingFooter: bool = False,
246 fast: bool = False
247 ) -> 'CowUtil.CheckResult':
248 pass
250 @abstractmethod
251 def revert(self, path: str, jFile: str) -> None:
252 pass
254 @abstractmethod
255 def repair(self, path: str) -> None:
256 pass
258 @abstractmethod
259 def validateAndRoundImageSize(self, size: int) -> int:
260 pass
262 @abstractmethod
263 def getKeyHash(self, path: str) -> Optional[str]:
264 pass
266 @abstractmethod
267 def setKey(self, path: str, key_hash: str) -> None:
268 pass
270 # The availability of coalesceOnline and cancelCoalesceOnline are dependent on isCoalesceableOnRemote() returning True
271 # If not, both function should raise NotImplementedError
272 @abstractmethod
273 def isCoalesceableOnRemote(self) -> bool:
274 pass
276 @abstractmethod
277 def coalesceOnline(self, path: str) -> int:
278 pass
280 @abstractmethod
281 def cancelCoalesceOnline(self, path: str) -> None:
282 pass
284 def getParentChain(self, lvName: str, extractUuidFunction: Callable[[str], str], vgName: str) -> Dict[str, str]:
285 """
286 Get the chain of all parents of 'path'. Safe to call for raw VDI's as well.
287 """
288 chain = {}
289 vdis: Dict[str, CowImageInfo] = {}
290 retries = 0
291 while (not vdis):
292 if retries > 60: 292 ↛ 293line 292 didn't jump to line 293, because the condition on line 292 was never true
293 util.SMlog('ERROR: getAllInfoFromVG returned 0 VDIs after %d retries' % retries)
294 util.SMlog('ERROR: the image metadata might be corrupted')
295 break
296 vdis = self.getAllInfoFromVG(lvName, extractUuidFunction, vgName, True, True)
297 if (not vdis): 297 ↛ 298line 297 didn't jump to line 298, because the condition on line 297 was never true
298 retries = retries + 1
299 time.sleep(1)
300 for uuid, vdi in vdis.items(): 300 ↛ 301line 300 didn't jump to line 301, because the loop on line 300 never started
301 chain[uuid] = vdi.path
302 #util.SMlog("Parent chain for %s: %s" % (lvName, chain))
303 return chain
305 @staticmethod
306 def isCowImage(image_format: ImageFormat) -> bool:
307 return bool(image_format & IMAGE_FORMAT_COW_FLAG)
309 @staticmethod
310 def _ioretry(cmd: Sequence[str], text: bool = True) -> Union[str, bytes]:
311 return util.ioretry(
312 lambda: util.pread2(cmd, text=text),
313 errlist=[errno.EIO, errno.EAGAIN]
314 )
316# ------------------------------------------------------------------------------
318def getImageFormatFromVdiType(vdi_type: str) -> ImageFormat:
319 if vdi_type == VdiType.RAW:
320 return ImageFormat.RAW
321 if vdi_type == VdiType.VHD:
322 return ImageFormat.VHD
323 if vdi_type == VdiType.QCOW2: 323 ↛ 326line 323 didn't jump to line 326, because the condition on line 323 was never false
324 return ImageFormat.QCOW2
326 assert False, f"Unsupported vdi type: {vdi_type}"
328def getImageStringFromVdiType(vdi_type: str) -> str:
329 return IMAGE_FORMAT_TO_STR[getImageFormatFromVdiType(vdi_type)]
331def getVdiTypeFromImageFormat(image_format: ImageFormat) -> str:
332 if image_format == ImageFormat.RAW:
333 return VdiType.RAW
334 if image_format == ImageFormat.VHD:
335 return VdiType.VHD
336 if image_format == ImageFormat.QCOW2:
337 return VdiType.QCOW2
339 assert False, f"Unsupported image format: {IMAGE_FORMAT_TO_STR[image_format]}"
341# ------------------------------------------------------------------------------
343def getCowUtilFromImageFormat(image_format: ImageFormat) -> CowUtil:
344 import vhdutil
345 import qcow2util
347 if image_format in (ImageFormat.RAW, ImageFormat.VHD):
348 return vhdutil.VhdUtil()
350 if image_format == ImageFormat.QCOW2: 350 ↛ 353line 350 didn't jump to line 353, because the condition on line 350 was never false
351 return qcow2util.QCowUtil()
353 assert False, f"Unsupported image format: {image_format}"
355def getCowUtil(vdi_type: str) -> CowUtil:
356 return getCowUtilFromImageFormat(getImageFormatFromVdiType(vdi_type))