Coverage for drivers/CephFSSR.py : 21%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python3
2#
3# Original work copyright (C) Citrix systems
4# Modified work copyright (C) Vates SAS and XCP-ng community
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU Lesser General Public License as published
8# by the Free Software Foundation; version 2.1 only.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU Lesser General Public License for more details.
14#
15# You should have received a copy of the GNU Lesser General Public License
16# along with this program; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18#
19# CEPHFSSR: Based on FileSR, mounts ceph fs share
21from sm_typing import override
23import errno
24import os
25import socket
26import syslog as _syslog
27import xmlrpc.client
28from syslog import syslog
30# careful with the import order here
31# FileSR has a circular dependency:
32# FileSR -> blktap2 -> lvutil -> EXTSR -> FileSR
33# importing in this order seems to avoid triggering the issue.
34import SR
35import SRCommand
36import FileSR
37# end of careful
38import VDI
39import cleanup
40import lock
41import util
42import xs_errors
44CAPABILITIES = ["SR_PROBE", "SR_UPDATE",
45 "VDI_CREATE", "VDI_DELETE", "VDI_ATTACH", "VDI_DETACH",
46 "VDI_UPDATE", "VDI_CLONE", "VDI_SNAPSHOT", "VDI_RESIZE", "VDI_MIRROR",
47 "VDI_GENERATE_CONFIG",
48 "VDI_RESET_ON_BOOT/2", "ATOMIC_PAUSE"]
50CONFIGURATION = [
51 ['server', 'Ceph server(s) (required, ex: "192.168.0.12" or "10.10.10.10,10.10.10.26")'],
52 ['serverpath', 'Ceph FS path (required, ex: "/")'],
53 ['serverport', 'ex: 6789'],
54 ['options', 'Ceph FS client name, and secretfile (required, ex: "name=admin,secretfile=/etc/ceph/admin.secret")']
55]
57DRIVER_INFO = {
58 'name': 'CephFS VHD and QCOW2',
59 'description': 'SR plugin which stores disks as VHD and QCOW2 files on a CephFS storage',
60 'vendor': 'Vates SAS',
61 'copyright': '(C) 2020 Vates SAS',
62 'driver_version': '1.0',
63 'required_api_version': '1.0',
64 'capabilities': CAPABILITIES,
65 'configuration': CONFIGURATION
66}
68DRIVER_CONFIG = {"ATTACH_FROM_CONFIG_WITH_TAPDISK": True}
70# The mountpoint for the directory when performing an sr_probe. All probes
71# are guaranteed to be serialised by xapi, so this single mountpoint is fine.
72PROBE_MOUNTPOINT = os.path.join(SR.MOUNT_BASE, "probe")
75class CephFSException(Exception):
76 def __init__(self, errstr):
77 self.errstr = errstr
80# mountpoint = /var/run/sr-mount/CephFS/uuid
81# linkpath = mountpoint/uuid - path to SR directory on share
82# path = /var/run/sr-mount/uuid - symlink to SR directory on share
83class CephFSSR(FileSR.FileSR):
84 """Ceph file-based storage repository"""
86 DRIVER_TYPE = 'cephfs'
88 @override
89 @staticmethod
90 def handles(sr_type) -> bool:
91 # fudge, because the parent class (FileSR) checks for smb to alter its behavior
92 return sr_type == CephFSSR.DRIVER_TYPE or sr_type == 'smb'
94 @override
95 def load(self, sr_uuid) -> None:
96 if not self._is_ceph_available():
97 raise xs_errors.XenError(
98 'SRUnavailable',
99 opterr='ceph is not installed'
100 )
102 self.ops_exclusive = FileSR.OPS_EXCLUSIVE
103 self.lock = lock.Lock(lock.LOCK_TYPE_SR, self.uuid)
104 self.sr_vditype = SR.DEFAULT_TAP
105 self.driver_config = DRIVER_CONFIG
106 if 'server' not in self.dconf:
107 raise xs_errors.XenError('ConfigServerMissing')
108 self.remoteserver = self.dconf['server']
109 self.remotepath = self.dconf['serverpath']
110 # if serverport is not specified, use default 6789
111 if 'serverport' not in self.dconf:
112 self.remoteport = "6789"
113 else:
114 self.remoteport = self.dconf['serverport']
115 if self.sr_ref and self.session is not None:
116 self.sm_config = self.session.xenapi.SR.get_sm_config(self.sr_ref)
117 else:
118 self.sm_config = self.srcmd.params.get('sr_sm_config') or {}
119 self.mountpoint = os.path.join(SR.MOUNT_BASE, 'CephFS', sr_uuid)
120 self.linkpath = os.path.join(self.mountpoint, sr_uuid or "")
121 self.path = os.path.join(SR.MOUNT_BASE, sr_uuid)
122 self._check_o_direct()
124 def checkmount(self):
125 return util.ioretry(lambda: ((util.pathexists(self.mountpoint) and
126 util.ismount(self.mountpoint)) and
127 util.pathexists(self.path)))
129 def mount(self, mountpoint=None):
130 """Mount the remote ceph export at 'mountpoint'"""
131 if mountpoint is None:
132 mountpoint = self.mountpoint
133 elif not util.is_string(mountpoint) or mountpoint == "":
134 raise CephFSException("mountpoint not a string object")
136 try:
137 if not util.ioretry(lambda: util.isdir(mountpoint)):
138 util.ioretry(lambda: util.makedirs(mountpoint))
139 except util.CommandException as inst:
140 raise CephFSException("Failed to make directory: code is %d" % inst.code)
142 try:
143 options = []
144 if 'options' in self.dconf:
145 options.append(self.dconf['options'])
146 if options:
147 options = ['-o', ','.join(options)]
148 acc = []
149 for server in self.remoteserver.split(','):
150 try:
151 addr_info = socket.getaddrinfo(server, 0)[0]
152 except Exception:
153 continue
155 acc.append('[' + server + ']' if addr_info[0] == socket.AF_INET6 else server)
157 remoteserver = ','.join(acc)
158 command = ["mount", '-t', 'ceph', remoteserver + ":" + self.remoteport + ":" + self.remotepath, mountpoint] + options
159 util.ioretry(lambda: util.pread(command), errlist=[errno.EPIPE, errno.EIO], maxretry=2, nofail=True)
160 except util.CommandException as inst:
161 syslog(_syslog.LOG_ERR, 'CephFS mount failed ' + inst.__str__())
162 raise CephFSException("mount failed with return code %d" % inst.code)
164 # Sanity check to ensure that the user has at least RO access to the
165 # mounted share. Windows sharing and security settings can be tricky.
166 try:
167 util.listdir(mountpoint)
168 except util.CommandException:
169 try:
170 self.unmount(mountpoint, True)
171 except CephFSException:
172 util.logException('CephFSSR.unmount()')
173 raise CephFSException("Permission denied. Please check user privileges.")
175 def unmount(self, mountpoint, rmmountpoint):
176 try:
177 util.pread(["umount", mountpoint])
178 except util.CommandException as inst:
179 raise CephFSException("umount failed with return code %d" % inst.code)
180 if rmmountpoint:
181 try:
182 os.rmdir(mountpoint)
183 except OSError as inst:
184 raise CephFSException("rmdir failed with error '%s'" % inst.strerror)
186 @override
187 def attach(self, sr_uuid) -> None:
188 if not self.checkmount():
189 try:
190 self.mount()
191 os.symlink(self.linkpath, self.path)
192 except CephFSException as exc:
193 raise xs_errors.SROSError(12, exc.errstr)
194 self.attached = True
196 @override
197 def probe(self) -> str:
198 try:
199 self.mount(PROBE_MOUNTPOINT)
200 sr_list = filter(util.match_uuid, util.listdir(PROBE_MOUNTPOINT))
201 self.unmount(PROBE_MOUNTPOINT, True)
202 except (util.CommandException, xs_errors.XenError):
203 raise
204 # Create a dictionary from the SR uuids to feed SRtoXML()
205 return util.SRtoXML({sr_uuid: {} for sr_uuid in sr_list})
207 @override
208 def detach(self, sr_uuid) -> None:
209 if not self.checkmount():
210 return
211 util.SMlog("Aborting GC/coalesce")
212 cleanup.abort(self.uuid)
213 # Change directory to avoid unmount conflicts
214 os.chdir(SR.MOUNT_BASE)
215 self.unmount(self.mountpoint, True)
216 os.unlink(self.path)
217 self.attached = False
219 @override
220 def create(self, sr_uuid, size) -> None:
221 if self.checkmount():
222 raise xs_errors.SROSError(113, 'CephFS mount point already attached')
224 try:
225 self.mount()
226 except CephFSException as exc:
227 # noinspection PyBroadException
228 try:
229 os.rmdir(self.mountpoint)
230 except:
231 # we have no recovery strategy
232 pass
233 raise xs_errors.SROSError(111, "CephFS mount error [opterr=%s]" % exc.errstr)
235 if util.ioretry(lambda: util.pathexists(self.linkpath)):
236 if len(util.ioretry(lambda: util.listdir(self.linkpath))) != 0:
237 self.detach(sr_uuid)
238 raise xs_errors.XenError('SRExists')
239 else:
240 try:
241 util.ioretry(lambda: util.makedirs(self.linkpath))
242 os.symlink(self.linkpath, self.path)
243 except util.CommandException as inst:
244 if inst.code != errno.EEXIST:
245 try:
246 self.unmount(self.mountpoint, True)
247 except CephFSException:
248 util.logException('CephFSSR.unmount()')
249 raise xs_errors.SROSError(116,
250 "Failed to create CephFS SR. remote directory creation error: {}".format(
251 os.strerror(inst.code)))
252 self.detach(sr_uuid)
254 @override
255 def delete(self, sr_uuid) -> None:
256 # try to remove/delete non VDI contents first
257 super(CephFSSR, self).delete(sr_uuid)
258 try:
259 if self.checkmount():
260 self.detach(sr_uuid)
261 self.mount()
262 if util.ioretry(lambda: util.pathexists(self.linkpath)):
263 util.ioretry(lambda: os.rmdir(self.linkpath))
264 util.SMlog(str(self.unmount(self.mountpoint, True)))
265 except util.CommandException as inst:
266 self.detach(sr_uuid)
267 if inst.code != errno.ENOENT:
268 raise xs_errors.SROSError(114, "Failed to remove CephFS mount point")
270 @override
271 def vdi(self, uuid) -> VDI.VDI:
272 return CephFSFileVDI(self, uuid)
274 @staticmethod
275 def _is_ceph_available():
276 return util.find_executable('ceph')
278class CephFSFileVDI(FileSR.FileVDI):
279 @override
280 def attach(self, sr_uuid, vdi_uuid) -> str:
281 if not hasattr(self, 'xenstore_data'):
282 self.xenstore_data = {}
284 self.xenstore_data['storage-type'] = CephFSSR.DRIVER_TYPE
286 return super(CephFSFileVDI, self).attach(sr_uuid, vdi_uuid)
288 @override
289 def generate_config(self, sr_uuid, vdi_uuid) -> str:
290 util.SMlog("SMBFileVDI.generate_config")
291 if not util.pathexists(self.path):
292 raise xs_errors.XenError('VDIUnavailable')
293 resp = {'device_config': self.sr.dconf,
294 'sr_uuid': sr_uuid,
295 'vdi_uuid': vdi_uuid,
296 'sr_sm_config': self.sr.sm_config,
297 'command': 'vdi_attach_from_config'}
298 # Return the 'config' encoded within a normal XMLRPC response so that
299 # we can use the regular response/error parsing code.
300 config = xmlrpc.client.dumps(tuple([resp]), "vdi_attach_from_config")
301 return xmlrpc.client.dumps((config,), "", True)
303 @override
304 def attach_from_config(self, sr_uuid, vdi_uuid) -> str:
305 try:
306 if not util.pathexists(self.sr.path):
307 return self.sr.attach(sr_uuid)
308 except:
309 util.logException("SMBFileVDI.attach_from_config")
310 raise xs_errors.XenError('SRUnavailable',
311 opterr='Unable to attach from config')
312 return ''
314if __name__ == '__main__': 314 ↛ 315line 314 didn't jump to line 315, because the condition on line 314 was never true
315 SRCommand.run(CephFSSR, DRIVER_INFO)
316else:
317 SR.registerSR(CephFSSR)