Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python3 

2# 

3# Original work copyright (C) Citrix systems 

4# Modified work copyright (C) Vates SAS and XCP-ng community 

5# 

6# This program is free software; you can redistribute it and/or modify 

7# it under the terms of the GNU Lesser General Public License as published 

8# by the Free Software Foundation; version 2.1 only. 

9# 

10# This program is distributed in the hope that it will be useful, 

11# but WITHOUT ANY WARRANTY; without even the implied warranty of 

12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13# GNU Lesser General Public License for more details. 

14# 

15# You should have received a copy of the GNU Lesser General Public License 

16# along with this program; if not, write to the Free Software Foundation, Inc., 

17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 

18# 

19# CEPHFSSR: Based on FileSR, mounts ceph fs share 

20 

21from sm_typing import override 

22 

23import errno 

24import os 

25import socket 

26import syslog as _syslog 

27import xmlrpc.client 

28from syslog import syslog 

29 

30# careful with the import order here 

31# FileSR has a circular dependency: 

32# FileSR -> blktap2 -> lvutil -> EXTSR -> FileSR 

33# importing in this order seems to avoid triggering the issue. 

34import SR 

35import SRCommand 

36import FileSR 

37# end of careful 

38import VDI 

39import cleanup 

40import lock 

41import util 

42import xs_errors 

43 

44CAPABILITIES = ["SR_PROBE", "SR_UPDATE", 

45 "VDI_CREATE", "VDI_DELETE", "VDI_ATTACH", "VDI_DETACH", 

46 "VDI_UPDATE", "VDI_CLONE", "VDI_SNAPSHOT", "VDI_RESIZE", "VDI_MIRROR", 

47 "VDI_GENERATE_CONFIG", 

48 "VDI_RESET_ON_BOOT/2", "ATOMIC_PAUSE"] 

49 

50CONFIGURATION = [ 

51 ['server', 'Ceph server(s) (required, ex: "192.168.0.12" or "10.10.10.10,10.10.10.26")'], 

52 ['serverpath', 'Ceph FS path (required, ex: "/")'], 

53 ['serverport', 'ex: 6789'], 

54 ['options', 'Ceph FS client name, and secretfile (required, ex: "name=admin,secretfile=/etc/ceph/admin.secret")'] 

55] 

56 

57DRIVER_INFO = { 

58 'name': 'CephFS VHD and QCOW2', 

59 'description': 'SR plugin which stores disks as VHD and QCOW2 files on a CephFS storage', 

60 'vendor': 'Vates SAS', 

61 'copyright': '(C) 2020 Vates SAS', 

62 'driver_version': '1.0', 

63 'required_api_version': '1.0', 

64 'capabilities': CAPABILITIES, 

65 'configuration': CONFIGURATION 

66} 

67 

68DRIVER_CONFIG = {"ATTACH_FROM_CONFIG_WITH_TAPDISK": True} 

69 

70# The mountpoint for the directory when performing an sr_probe. All probes 

71# are guaranteed to be serialised by xapi, so this single mountpoint is fine. 

72PROBE_MOUNTPOINT = os.path.join(SR.MOUNT_BASE, "probe") 

73 

74 

75class CephFSException(Exception): 

76 def __init__(self, errstr): 

77 self.errstr = errstr 

78 

79 

80# mountpoint = /var/run/sr-mount/CephFS/uuid 

81# linkpath = mountpoint/uuid - path to SR directory on share 

82# path = /var/run/sr-mount/uuid - symlink to SR directory on share 

83class CephFSSR(FileSR.FileSR): 

84 """Ceph file-based storage repository""" 

85 

86 DRIVER_TYPE = 'cephfs' 

87 

88 @override 

89 @staticmethod 

90 def handles(sr_type) -> bool: 

91 # fudge, because the parent class (FileSR) checks for smb to alter its behavior 

92 return sr_type == CephFSSR.DRIVER_TYPE or sr_type == 'smb' 

93 

94 @override 

95 def load(self, sr_uuid) -> None: 

96 if not self._is_ceph_available(): 

97 raise xs_errors.XenError( 

98 'SRUnavailable', 

99 opterr='ceph is not installed' 

100 ) 

101 

102 self.ops_exclusive = FileSR.OPS_EXCLUSIVE 

103 self.lock = lock.Lock(lock.LOCK_TYPE_SR, self.uuid) 

104 self.sr_vditype = SR.DEFAULT_TAP 

105 self.driver_config = DRIVER_CONFIG 

106 if 'server' not in self.dconf: 

107 raise xs_errors.XenError('ConfigServerMissing') 

108 self.remoteserver = self.dconf['server'] 

109 self.remotepath = self.dconf['serverpath'] 

110 # if serverport is not specified, use default 6789 

111 if 'serverport' not in self.dconf: 

112 self.remoteport = "6789" 

113 else: 

114 self.remoteport = self.dconf['serverport'] 

115 if self.sr_ref and self.session is not None: 

116 self.sm_config = self.session.xenapi.SR.get_sm_config(self.sr_ref) 

117 else: 

118 self.sm_config = self.srcmd.params.get('sr_sm_config') or {} 

119 self.mountpoint = os.path.join(SR.MOUNT_BASE, 'CephFS', sr_uuid) 

120 self.linkpath = os.path.join(self.mountpoint, sr_uuid or "") 

121 self.path = os.path.join(SR.MOUNT_BASE, sr_uuid) 

122 self._check_o_direct() 

123 

124 def checkmount(self): 

125 return util.ioretry(lambda: ((util.pathexists(self.mountpoint) and 

126 util.ismount(self.mountpoint)) and 

127 util.pathexists(self.path))) 

128 

129 def mount(self, mountpoint=None): 

130 """Mount the remote ceph export at 'mountpoint'""" 

131 if mountpoint is None: 

132 mountpoint = self.mountpoint 

133 elif not util.is_string(mountpoint) or mountpoint == "": 

134 raise CephFSException("mountpoint not a string object") 

135 

136 try: 

137 if not util.ioretry(lambda: util.isdir(mountpoint)): 

138 util.ioretry(lambda: util.makedirs(mountpoint)) 

139 except util.CommandException as inst: 

140 raise CephFSException("Failed to make directory: code is %d" % inst.code) 

141 

142 try: 

143 options = [] 

144 if 'options' in self.dconf: 

145 options.append(self.dconf['options']) 

146 if options: 

147 options = ['-o', ','.join(options)] 

148 acc = [] 

149 for server in self.remoteserver.split(','): 

150 try: 

151 addr_info = socket.getaddrinfo(server, 0)[0] 

152 except Exception: 

153 continue 

154 

155 acc.append('[' + server + ']' if addr_info[0] == socket.AF_INET6 else server) 

156 

157 remoteserver = ','.join(acc) 

158 command = ["mount", '-t', 'ceph', remoteserver + ":" + self.remoteport + ":" + self.remotepath, mountpoint] + options 

159 util.ioretry(lambda: util.pread(command), errlist=[errno.EPIPE, errno.EIO], maxretry=2, nofail=True) 

160 except util.CommandException as inst: 

161 syslog(_syslog.LOG_ERR, 'CephFS mount failed ' + inst.__str__()) 

162 raise CephFSException("mount failed with return code %d" % inst.code) 

163 

164 # Sanity check to ensure that the user has at least RO access to the 

165 # mounted share. Windows sharing and security settings can be tricky. 

166 try: 

167 util.listdir(mountpoint) 

168 except util.CommandException: 

169 try: 

170 self.unmount(mountpoint, True) 

171 except CephFSException: 

172 util.logException('CephFSSR.unmount()') 

173 raise CephFSException("Permission denied. Please check user privileges.") 

174 

175 def unmount(self, mountpoint, rmmountpoint): 

176 try: 

177 util.pread(["umount", mountpoint]) 

178 except util.CommandException as inst: 

179 raise CephFSException("umount failed with return code %d" % inst.code) 

180 if rmmountpoint: 

181 try: 

182 os.rmdir(mountpoint) 

183 except OSError as inst: 

184 raise CephFSException("rmdir failed with error '%s'" % inst.strerror) 

185 

186 @override 

187 def attach(self, sr_uuid) -> None: 

188 if not self.checkmount(): 

189 try: 

190 self.mount() 

191 os.symlink(self.linkpath, self.path) 

192 except CephFSException as exc: 

193 raise xs_errors.SROSError(12, exc.errstr) 

194 self.attached = True 

195 

196 @override 

197 def probe(self) -> str: 

198 try: 

199 self.mount(PROBE_MOUNTPOINT) 

200 sr_list = filter(util.match_uuid, util.listdir(PROBE_MOUNTPOINT)) 

201 self.unmount(PROBE_MOUNTPOINT, True) 

202 except (util.CommandException, xs_errors.XenError): 

203 raise 

204 # Create a dictionary from the SR uuids to feed SRtoXML() 

205 return util.SRtoXML({sr_uuid: {} for sr_uuid in sr_list}) 

206 

207 @override 

208 def detach(self, sr_uuid) -> None: 

209 if not self.checkmount(): 

210 return 

211 util.SMlog("Aborting GC/coalesce") 

212 cleanup.abort(self.uuid) 

213 # Change directory to avoid unmount conflicts 

214 os.chdir(SR.MOUNT_BASE) 

215 self.unmount(self.mountpoint, True) 

216 os.unlink(self.path) 

217 self.attached = False 

218 

219 @override 

220 def create(self, sr_uuid, size) -> None: 

221 if self.checkmount(): 

222 raise xs_errors.SROSError(113, 'CephFS mount point already attached') 

223 

224 try: 

225 self.mount() 

226 except CephFSException as exc: 

227 # noinspection PyBroadException 

228 try: 

229 os.rmdir(self.mountpoint) 

230 except: 

231 # we have no recovery strategy 

232 pass 

233 raise xs_errors.SROSError(111, "CephFS mount error [opterr=%s]" % exc.errstr) 

234 

235 if util.ioretry(lambda: util.pathexists(self.linkpath)): 

236 if len(util.ioretry(lambda: util.listdir(self.linkpath))) != 0: 

237 self.detach(sr_uuid) 

238 raise xs_errors.XenError('SRExists') 

239 else: 

240 try: 

241 util.ioretry(lambda: util.makedirs(self.linkpath)) 

242 os.symlink(self.linkpath, self.path) 

243 except util.CommandException as inst: 

244 if inst.code != errno.EEXIST: 

245 try: 

246 self.unmount(self.mountpoint, True) 

247 except CephFSException: 

248 util.logException('CephFSSR.unmount()') 

249 raise xs_errors.SROSError(116, 

250 "Failed to create CephFS SR. remote directory creation error: {}".format( 

251 os.strerror(inst.code))) 

252 self.detach(sr_uuid) 

253 

254 @override 

255 def delete(self, sr_uuid) -> None: 

256 # try to remove/delete non VDI contents first 

257 super(CephFSSR, self).delete(sr_uuid) 

258 try: 

259 if self.checkmount(): 

260 self.detach(sr_uuid) 

261 self.mount() 

262 if util.ioretry(lambda: util.pathexists(self.linkpath)): 

263 util.ioretry(lambda: os.rmdir(self.linkpath)) 

264 util.SMlog(str(self.unmount(self.mountpoint, True))) 

265 except util.CommandException as inst: 

266 self.detach(sr_uuid) 

267 if inst.code != errno.ENOENT: 

268 raise xs_errors.SROSError(114, "Failed to remove CephFS mount point") 

269 

270 @override 

271 def vdi(self, uuid) -> VDI.VDI: 

272 return CephFSFileVDI(self, uuid) 

273 

274 @staticmethod 

275 def _is_ceph_available(): 

276 return util.find_executable('ceph') 

277 

278class CephFSFileVDI(FileSR.FileVDI): 

279 @override 

280 def attach(self, sr_uuid, vdi_uuid) -> str: 

281 if not hasattr(self, 'xenstore_data'): 

282 self.xenstore_data = {} 

283 

284 self.xenstore_data['storage-type'] = CephFSSR.DRIVER_TYPE 

285 

286 return super(CephFSFileVDI, self).attach(sr_uuid, vdi_uuid) 

287 

288 @override 

289 def generate_config(self, sr_uuid, vdi_uuid) -> str: 

290 util.SMlog("SMBFileVDI.generate_config") 

291 if not util.pathexists(self.path): 

292 raise xs_errors.XenError('VDIUnavailable') 

293 resp = {'device_config': self.sr.dconf, 

294 'sr_uuid': sr_uuid, 

295 'vdi_uuid': vdi_uuid, 

296 'sr_sm_config': self.sr.sm_config, 

297 'command': 'vdi_attach_from_config'} 

298 # Return the 'config' encoded within a normal XMLRPC response so that 

299 # we can use the regular response/error parsing code. 

300 config = xmlrpc.client.dumps(tuple([resp]), "vdi_attach_from_config") 

301 return xmlrpc.client.dumps((config,), "", True) 

302 

303 @override 

304 def attach_from_config(self, sr_uuid, vdi_uuid) -> str: 

305 try: 

306 if not util.pathexists(self.sr.path): 

307 return self.sr.attach(sr_uuid) 

308 except: 

309 util.logException("SMBFileVDI.attach_from_config") 

310 raise xs_errors.XenError('SRUnavailable', 

311 opterr='Unable to attach from config') 

312 return '' 

313 

314if __name__ == '__main__': 314 ↛ 315line 314 didn't jump to line 315, because the condition on line 314 was never true

315 SRCommand.run(CephFSSR, DRIVER_INFO) 

316else: 

317 SR.registerSR(CephFSSR)