TISbackup/libtisbackup/base_driver.py
k3nny 1cb731cbdb
Some checks failed
lint / docker (push) Has been cancelled
refactor(drivers): organize backup modules into drivers subfolder
- Move all backup_*.py files to libtisbackup/drivers/ subdirectory
- Move XenAPI.py and copy_vm_xcp.py to drivers/ (driver-specific)
- Create drivers/__init__.py with automatic driver imports
- Update tisbackup.py imports to use new structure
- Add pyvmomi>=8.0.0 as mandatory dependency
- Sync requirements.txt with pyproject.toml dependencies
- Add pylint>=3.0.0 and pytest-cov>=6.0.0 to dev dependencies
- Configure pylint and coverage tools in pyproject.toml
- Add conventional commits guidelines to CLAUDE.md
- Enhance .gitignore with comprehensive patterns for Python, IDEs, testing, and secrets
- Update CLAUDE.md documentation with new structure and tooling

Breaking Changes:
- Drivers must now be imported from libtisbackup.drivers instead of libtisbackup
- All backup driver files relocated to drivers/ subdirectory

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-05 23:54:26 +02:00

527 lines
25 KiB
Python

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------
# This file is part of TISBackup
#
# TISBackup is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# TISBackup is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with TISBackup. If not, see <http://www.gnu.org/licenses/>.
#
# -----------------------------------------------------------------------
"""Base backup driver class and driver registry."""
import datetime
import logging
import os
import re
import shutil
import subprocess
import time
from abc import ABC, abstractmethod
from iniparse import ConfigParser
from .database import BackupStat
from .process import monitor_stdout
from .ssh import load_ssh_private_key
from .utils import dateof, datetime2isodate, isodate2datetime
try:
import paramiko
except ImportError as e:
print(("Error : can not load paramiko library %s" % e))
raise
# Nagios state constants
nagiosStateOk = 0
nagiosStateWarning = 1
nagiosStateCritical = 2
nagiosStateUnknown = 3
# Global driver registry
backup_drivers = {}
def register_driver(driverclass):
"""Register a backup driver class in the global registry."""
backup_drivers[driverclass.type] = driverclass
class backup_generic(ABC):
"""Generic ancestor class for backups, not registered"""
type = "generic"
required_params = ["type", "backup_name", "backup_dir", "server_name", "backup_retention_time", "maximum_backup_age"]
optional_params = ["preexec", "postexec", "description", "private_key", "remote_user", "ssh_port"]
logger = logging.getLogger("tisbackup")
backup_name = ""
backup_dir = ""
server_name = ""
remote_user = "root"
description = ""
dbstat = None
dry_run = False
preexec = ""
postexec = ""
maximum_backup_age = None
backup_retention_time = None
verbose = False
private_key = ""
ssh_port = 22
def __init__(self, backup_name, backup_dir, dbstat=None, dry_run=False):
if not re.match(r"^[A-Za-z0-9_\-\.]*$", backup_name):
raise Exception("The backup name %s should contain only alphanumerical characters" % backup_name)
self.backup_name = backup_name
self.backup_dir = backup_dir
self.dbstat = dbstat
assert isinstance(self.dbstat, BackupStat) or self.dbstat is None
if not os.path.isdir(self.backup_dir):
os.makedirs(self.backup_dir)
self.dry_run = dry_run
@classmethod
def get_help(cls):
return """\
%(type)s : %(desc)s
Required params : %(required)s
Optional params : %(optional)s
""" % {"type": cls.type, "desc": cls.__doc__, "required": ",".join(cls.required_params), "optional": ",".join(cls.optional_params)}
def check_required_params(self):
for name in self.required_params:
if not hasattr(self, name) or not getattr(self, name):
raise Exception("[%s] Config Attribute %s is required" % (self.backup_name, name))
if (self.preexec or self.postexec) and (not self.private_key or not self.remote_user):
raise Exception("[%s] remote_user and private_key file required if preexec or postexec is used" % self.backup_name)
def read_config(self, iniconf):
assert isinstance(iniconf, ConfigParser)
allowed_params = self.required_params + self.optional_params
for name, value in iniconf.items(self.backup_name):
if name not in allowed_params:
self.logger.critical('[%s] Invalid param name "%s"', self.backup_name, name)
raise Exception('[%s] Invalid param name "%s"', self.backup_name, name)
self.logger.debug("[%s] reading param %s = %s ", self.backup_name, name, value)
setattr(self, name, value)
# if retention (in days) is not defined at section level, get default global one.
if not self.backup_retention_time:
self.backup_retention_time = iniconf.getint("global", "backup_retention_time")
# for nagios, if maximum last backup age (in hours) is not defined at section level, get default global one.
if not self.maximum_backup_age:
self.maximum_backup_age = iniconf.getint("global", "maximum_backup_age")
self.ssh_port = int(self.ssh_port)
self.backup_retention_time = int(self.backup_retention_time)
self.maximum_backup_age = int(self.maximum_backup_age)
self.check_required_params()
def do_preexec(self, stats):
self.logger.info("[%s] executing preexec %s ", self.backup_name, self.preexec)
mykey = load_ssh_private_key(self.private_key)
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self.server_name, username=self.remote_user, pkey=mykey, port=self.ssh_port)
tran = ssh.get_transport()
chan = tran.open_session()
# chan.set_combine_stderr(True)
chan.get_pty()
stdout = chan.makefile()
if not self.dry_run:
chan.exec_command(self.preexec)
output = stdout.read()
exit_code = chan.recv_exit_status()
self.logger.info('[%s] preexec exit code : "%i", output : %s', self.backup_name, exit_code, output)
return exit_code
else:
return 0
def do_postexec(self, stats):
self.logger.info("[%s] executing postexec %s ", self.backup_name, self.postexec)
mykey = load_ssh_private_key(self.private_key)
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self.server_name, username=self.remote_user, pkey=mykey, port=self.ssh_port)
tran = ssh.get_transport()
chan = tran.open_session()
# chan.set_combine_stderr(True)
chan.get_pty()
stdout = chan.makefile()
if not self.dry_run:
chan.exec_command(self.postexec)
output = stdout.read()
exit_code = chan.recv_exit_status()
self.logger.info('[%s] postexec exit code : "%i", output : %s', self.backup_name, exit_code, output)
return exit_code
else:
return 0
def do_backup(self, stats):
"""stats dict with keys : total_files_count,written_files_count,total_bytes,written_bytes"""
pass
def check_params_connections(self):
"""Perform a dry run trying to connect without actually doing backup"""
self.check_required_params()
def process_backup(self):
"""Process the backup.
launch
- do_preexec
- do_backup
- do_postexec
returns a dict for stats
"""
self.logger.info("[%s] ######### Starting backup", self.backup_name)
starttime = time.time()
self.backup_start_date = datetime.datetime.now().strftime("%Y%m%d-%Hh%Mm%S")
if not self.dry_run and self.dbstat:
stat_rowid = self.dbstat.start(backup_name=self.backup_name, server_name=self.server_name, TYPE="BACKUP")
else:
stat_rowid = None
try:
stats = {}
stats["total_files_count"] = 0
stats["written_files_count"] = 0
stats["total_bytes"] = 0
stats["written_bytes"] = 0
stats["log"] = ""
stats["status"] = "Running"
stats["backup_location"] = None
if self.preexec.strip():
exit_code = self.do_preexec(stats)
if exit_code != 0:
raise Exception('Preexec "%s" failed with exit code "%i"' % (self.preexec, exit_code))
self.do_backup(stats)
if self.postexec.strip():
exit_code = self.do_postexec(stats)
if exit_code != 0:
raise Exception('Postexec "%s" failed with exit code "%i"' % (self.postexec, exit_code))
endtime = time.time()
duration = (endtime - starttime) / 3600.0
if not self.dry_run and self.dbstat:
self.dbstat.finish(
stat_rowid,
backup_end=datetime2isodate(datetime.datetime.now()),
backup_duration=duration,
total_files_count=stats["total_files_count"],
written_files_count=stats["written_files_count"],
total_bytes=stats["total_bytes"],
written_bytes=stats["written_bytes"],
status=stats["status"],
log=stats["log"],
backup_location=stats["backup_location"],
)
self.logger.info("[%s] ######### Backup finished : %s", self.backup_name, stats["log"])
return stats
except BaseException as e:
stats["status"] = "ERROR"
stats["log"] = str(e)
endtime = time.time()
duration = (endtime - starttime) / 3600.0
if not self.dry_run and self.dbstat:
self.dbstat.finish(
stat_rowid,
backup_end=datetime2isodate(datetime.datetime.now()),
backup_duration=duration,
total_files_count=stats["total_files_count"],
written_files_count=stats["written_files_count"],
total_bytes=stats["total_bytes"],
written_bytes=stats["written_bytes"],
status=stats["status"],
log=stats["log"],
backup_location=stats["backup_location"],
)
self.logger.error("[%s] ######### Backup finished with ERROR: %s", self.backup_name, stats["log"])
raise
def checknagios(self):
"""
Returns a tuple (nagiosstatus,message) for the current backup_name
Read status from dbstat database
"""
if not self.dbstat:
self.logger.warn("[%s] checknagios : no database provided", self.backup_name)
return ("No database provided", nagiosStateUnknown)
else:
self.logger.debug(
'[%s] checknagios : sql query "%s" %s',
self.backup_name,
"select status, backup_end, log from stats where TYPE='BACKUP' AND backup_name=? order by backup_end desc limit 30",
self.backup_name,
)
q = self.dbstat.query(
"select status, backup_start, backup_end, log, backup_location, total_bytes from stats where TYPE='BACKUP' AND backup_name=? order by backup_start desc limit 30",
(self.backup_name,),
)
if not q:
self.logger.debug("[%s] checknagios : no result from query", self.backup_name)
return (nagiosStateCritical, "CRITICAL : No backup found for %s in database" % self.backup_name)
else:
mindate = datetime2isodate((datetime.datetime.now() - datetime.timedelta(hours=self.maximum_backup_age)))
self.logger.debug("[%s] checknagios : looking for most recent OK not older than %s", self.backup_name, mindate)
for b in q:
if b["backup_end"] >= mindate and b["status"] == "OK":
# check if backup actually exists on registered backup location and is newer than backup start date
if b["total_bytes"] == 0:
return (nagiosStateWarning, "WARNING : No data to backup was found for %s" % (self.backup_name,))
if not b["backup_location"]:
return (
nagiosStateWarning,
"WARNING : No Backup location found for %s finished on (%s) %s"
% (self.backup_name, isodate2datetime(b["backup_end"]), b["log"]),
)
if os.path.isfile(b["backup_location"]):
backup_actual_date = datetime.datetime.fromtimestamp(os.stat(b["backup_location"]).st_ctime)
if backup_actual_date + datetime.timedelta(hours=1) > isodate2datetime(b["backup_start"]):
return (
nagiosStateOk,
"OK Backup %s (%s), %s" % (self.backup_name, isodate2datetime(b["backup_end"]), b["log"]),
)
else:
return (
nagiosStateCritical,
"CRITICAL Backup %s (%s), %s seems older than start of backup"
% (self.backup_name, isodate2datetime(b["backup_end"]), b["log"]),
)
elif os.path.isdir(b["backup_location"]):
return (
nagiosStateOk,
"OK Backup %s (%s), %s" % (self.backup_name, isodate2datetime(b["backup_end"]), b["log"]),
)
elif self.type == "copy-vm-xcp":
return (
nagiosStateOk,
"OK Backup %s (%s), %s" % (self.backup_name, isodate2datetime(b["backup_end"]), b["log"]),
)
else:
return (
nagiosStateCritical,
"CRITICAL Backup %s (%s), %s has disapeared from backup location %s"
% (self.backup_name, isodate2datetime(b["backup_end"]), b["log"], b["backup_location"]),
)
self.logger.debug(
"[%s] checknagios : looking for most recent Warning or Running not older than %s", self.backup_name, mindate
)
for b in q:
if b["backup_end"] >= mindate and b["status"] in ("Warning", "Running"):
return (nagiosStateWarning, "WARNING : Backup %s still running or warning. %s" % (self.backup_name, b["log"]))
self.logger.debug("[%s] checknagios : No Ok or warning recent backup found", self.backup_name)
return (nagiosStateCritical, "CRITICAL : No recent backup for %s" % self.backup_name)
def cleanup_backup(self):
"""Removes obsolete backups (older than backup_retention_time)"""
mindate = datetime2isodate((dateof(datetime.datetime.now()) - datetime.timedelta(days=self.backup_retention_time)))
# check if there is at least 1 "OK" backup left after cleanup :
ok_backups = self.dbstat.query(
'select backup_location from stats where TYPE="BACKUP" and backup_name=? and backup_start>=? and status="OK" order by backup_start desc',
(self.backup_name, mindate),
)
removed = []
if ok_backups and os.path.exists(ok_backups[0]["backup_location"]):
records = self.dbstat.query(
'select status, backup_start, backup_end, log, backup_location from stats where backup_name=? and backup_start<? and backup_location is not null and TYPE="BACKUP" order by backup_start',
(self.backup_name, mindate),
)
if records:
for oldbackup_location in [rec["backup_location"] for rec in records if rec["backup_location"]]:
try:
if os.path.isdir(oldbackup_location) and self.backup_dir in oldbackup_location:
self.logger.info('[%s] removing directory "%s"', self.backup_name, oldbackup_location)
if not self.dry_run:
if self.type == "rsync+btrfs+ssh" or self.type == "rsync+btrfs":
cmd = "/bin/btrfs subvolume delete %s" % oldbackup_location
process = subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True
)
log = monitor_stdout(process, "", self)
returncode = process.returncode
if returncode != 0:
self.logger.error("[" + self.backup_name + "] shell program exited with error code: %s" % log)
raise Exception(
"[" + self.backup_name + "] shell program exited with error code " + str(returncode), cmd
)
else:
self.logger.info(
"[" + self.backup_name + "] deleting snapshot volume: %s" % oldbackup_location.encode("ascii")
)
else:
shutil.rmtree(oldbackup_location.encode("ascii"))
if os.path.isfile(oldbackup_location) and self.backup_dir in oldbackup_location:
self.logger.debug('[%s] removing file "%s"', self.backup_name, oldbackup_location)
if not self.dry_run:
os.remove(oldbackup_location)
self.logger.debug('Cleanup_backup : Removing records from DB : [%s]-"%s"', self.backup_name, oldbackup_location)
if not self.dry_run:
self.dbstat.db.execute(
'update stats set TYPE="CLEAN" where backup_name=? and backup_location=?',
(self.backup_name, oldbackup_location),
)
self.dbstat.db.commit()
except BaseException as e:
self.logger.error('cleanup_backup : Unable to remove directory/file "%s". Error %s', oldbackup_location, e)
removed.append((self.backup_name, oldbackup_location))
else:
self.logger.debug("[%s] cleanup : no result for query", self.backup_name)
else:
self.logger.info("Nothing to do because we want to keep at least one OK backup after cleaning")
self.logger.info(
"[%s] Cleanup finished : removed : %s", self.backup_name, ",".join([('[%s]-"%s"') % r for r in removed]) or "Nothing"
)
return removed
@abstractmethod
def register_existingbackups(self):
pass
# """scan existing backups and insert stats in database"""
# registered = [b['backup_location'] for b in self.dbstat.query('select distinct backup_location from stats where backup_name=?',[self.backup_name])]
# raise Exception('Abstract method')
def export_latestbackup(self, destdir):
"""Copy (rsync) latest OK backup to external storage located at locally mounted "destdir" """
stats = {}
stats["total_files_count"] = 0
stats["written_files_count"] = 0
stats["total_bytes"] = 0
stats["written_bytes"] = 0
stats["log"] = ""
stats["status"] = "Running"
if not self.dbstat:
self.logger.critical("[%s] export_latestbackup : no database provided", self.backup_name)
raise Exception("No database")
else:
latest_sql = """\
select status, backup_start, backup_end, log, backup_location, total_bytes
from stats
where backup_name=? and status='OK' and TYPE='BACKUP'
order by backup_start desc limit 30"""
self.logger.debug('[%s] export_latestbackup : sql query "%s" %s', self.backup_name, latest_sql, self.backup_name)
q = self.dbstat.query(latest_sql, (self.backup_name,))
if not q:
self.logger.debug("[%s] export_latestbackup : no result from query", self.backup_name)
raise Exception("No OK backup found for %s in database" % self.backup_name)
else:
latest = q[0]
backup_source = latest["backup_location"]
backup_dest = os.path.join(os.path.abspath(destdir), self.backup_name)
if not os.path.exists(backup_source):
raise Exception("Backup source %s doesn't exists" % backup_source)
# ensure there is a slash at end
if os.path.isdir(backup_source) and backup_source[-1] != "/":
backup_source += "/"
if backup_dest[-1] != "/":
backup_dest += "/"
if not os.path.isdir(backup_dest):
os.makedirs(backup_dest)
options = ["-aP", "--stats", "--delete-excluded", "--numeric-ids", "--delete-after"]
if self.logger.level:
options.append("-P")
if self.dry_run:
options.append("-d")
options_params = " ".join(options)
cmd = "/usr/bin/rsync %s %s %s 2>&1" % (options_params, backup_source, backup_dest)
self.logger.debug("[%s] rsync : %s", self.backup_name, cmd)
if not self.dry_run:
self.line = ""
starttime = time.time()
stat_rowid = self.dbstat.start(backup_name=self.backup_name, server_name=self.server_name, TYPE="EXPORT")
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
def ondata(data, context):
if context.verbose:
print(data)
context.logger.debug(data)
log = monitor_stdout(process, ondata, self)
for l in log.splitlines():
if l.startswith("Number of files:"):
stats["total_files_count"] += int(re.findall("[0-9]+", l.split(":")[1])[0])
if l.startswith("Number of files transferred:"):
stats["written_files_count"] += int(l.split(":")[1])
if l.startswith("Total file size:"):
stats["total_bytes"] += float(l.replace(",", "").split(":")[1].split()[0])
if l.startswith("Total transferred file size:"):
stats["written_bytes"] += float(l.replace(",", "").split(":")[1].split()[0])
returncode = process.returncode
## deal with exit code 24 (file vanished)
if returncode == 24:
self.logger.warning("[" + self.backup_name + "] Note: some files vanished before transfer")
elif returncode == 23:
self.logger.warning("[" + self.backup_name + "] unable so set uid on some files")
elif returncode != 0:
self.logger.error("[" + self.backup_name + "] shell program exited with error code ")
raise Exception("[" + self.backup_name + "] shell program exited with error code " + str(returncode), cmd)
else:
print(cmd)
stats["status"] = "OK"
self.logger.info(
"export backup from %s to %s OK, %d bytes written for %d changed files"
% (backup_source, backup_dest, stats["written_bytes"], stats["written_files_count"])
)
endtime = time.time()
duration = (endtime - starttime) / 3600.0
if not self.dry_run and self.dbstat:
self.dbstat.finish(
stat_rowid,
backup_end=datetime2isodate(datetime.datetime.now()),
backup_duration=duration,
total_files_count=stats["total_files_count"],
written_files_count=stats["written_files_count"],
total_bytes=stats["total_bytes"],
written_bytes=stats["written_bytes"],
status=stats["status"],
log=stats["log"],
backup_location=backup_dest,
)
return stats