From debc753f13a241049f3a832f631f5798d82cf8e2 Mon Sep 17 00:00:00 2001 From: k3nny Date: Sun, 5 Oct 2025 01:23:53 +0200 Subject: [PATCH] fix(security): replace os.popen/os.system with subprocess for command injection prevention MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace all deprecated and unsafe command execution methods with secure subprocess.run() calls using list arguments. Changes: - Replace os.popen() with subprocess.run() in tisbackup_gui.py - Replace os.system() with subprocess.run() in tasks.py and backup_xva.py - Add input validation for device/partition names (regex-based) - Fix file operations to use context managers (with statement) - Remove wildcard import from shutil - Add timeout protection to all subprocess calls (5-30s) - Improve error handling with proper try/except blocks Security improvements: - Prevent command injection vulnerabilities in USB disk operations - Validate device paths with regex before system calls - Use list arguments instead of shell=True to prevent injection - Add proper error handling instead of silent failures Code quality improvements: - Replace deprecated os.popen() (deprecated since Python 2.6) - Use context managers for file operations - Remove wildcard imports for cleaner namespace - Add comprehensive error handling and logging Documentation: - Add SECURITY_IMPROVEMENTS.md documenting all changes - Document remaining security issues and recommendations - Include testing recommendations and migration notes BREAKING CHANGE: None - all changes are backward compatible 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- SECURITY_IMPROVEMENTS.md | 156 +++++++++++++++++++++++++++++++++++++ libtisbackup/backup_xva.py | 13 +++- tasks.py | 9 ++- tisbackup_gui.py | 139 ++++++++++++++++++++++++++------- 4 files changed, 283 insertions(+), 34 deletions(-) create mode 100644 SECURITY_IMPROVEMENTS.md diff --git a/SECURITY_IMPROVEMENTS.md b/SECURITY_IMPROVEMENTS.md new file mode 100644 index 0000000..db992d5 --- /dev/null +++ b/SECURITY_IMPROVEMENTS.md @@ -0,0 +1,156 @@ +# Security and Code Quality Improvements + +This document summarizes the security and code quality improvements made to TISBackup. + +## Completed Improvements (High Priority) + +### 1. Replaced `os.popen()` with `subprocess.run()` +**Files Modified:** [tisbackup_gui.py](tisbackup_gui.py) + +**Changes:** +- Replaced deprecated `os.popen()` calls with modern `subprocess.run()` +- All subprocess calls now use list arguments instead of shell strings +- Added timeout protection (5-30 seconds depending on operation) +- Proper error handling with try/except blocks + +**Before:** +```python +for line in os.popen("udevadm info -q env -n %s" % name): + # Process output +``` + +**After:** +```python +result = subprocess.run( + ["udevadm", "info", "-q", "env", "-n", name], + capture_output=True, + text=True, + check=True, + timeout=5 +) +for line in result.stdout.splitlines(): + # Process output +``` + +**Security Impact:** Prevents command injection vulnerabilities + +### 2. Replaced `os.system()` with `subprocess.run()` +**Files Modified:** [tasks.py](tasks.py), [libtisbackup/backup_xva.py](libtisbackup/backup_xva.py) + +**Changes:** +- [tasks.py:37](tasks.py#L37): Changed `os.system("/bin/umount %s")` to `subprocess.run(["/bin/umount", mount_point])` +- [backup_xva.py:199](libtisbackup/backup_xva.py#L199): Changed `os.system('tar tf "%s"')` to `subprocess.run(["tar", "tf", filename_temp])` +- Added proper error handling and logging + +**Security Impact:** Eliminates command injection risk from potentially user-controlled mount points and filenames + +### 3. Added Input Validation +**Files Modified:** [tisbackup_gui.py](tisbackup_gui.py) + +**Changes:** +- Added regex validation for device/partition names: `^/dev/sd[a-z]1?$` +- Validates partition names before using in mount/unmount operations +- Prevents path traversal and command injection attacks + +**Example:** +```python +# Validate partition name to prevent command injection +if not re.match(r"^/dev/sd[a-z]1$", partition): + continue +``` + +**Security Impact:** Prevents malicious input from reaching system commands + +### 4. Fixed File Operations with Context Managers +**Files Modified:** [tisbackup_gui.py](tisbackup_gui.py) + +**Before:** +```python +line = open(elem).readline() +``` + +**After:** +```python +with open(elem) as f: + line = f.readline() +``` + +**Impact:** Ensures files are properly closed, prevents resource leaks + +### 5. Improved `run_command()` Function +**Files Modified:** [tisbackup_gui.py:415-453](tisbackup_gui.py#L415) + +**Changes:** +- Now accepts list arguments for safe command execution +- Backward compatible with string commands (marked as legacy) +- Added timeout protection (30 seconds) +- Better error handling and reporting + +**Security Impact:** Provides safe command execution interface while maintaining backward compatibility + +### 6. Removed Wildcard Import +**Files Modified:** [tisbackup_gui.py](tisbackup_gui.py) + +**Before:** +```python +from shutil import * +``` + +**After:** +```python +import shutil +import subprocess +``` + +**Impact:** Cleaner namespace, easier to track dependencies + +## Remaining Security Issues (Critical - Not Fixed) + +### 1. **Hardcoded Secret Key** ([tisbackup_gui.py:64](tisbackup_gui.py#L64)) +```python +app.secret_key = "fsiqefiuqsefARZ4Zfesfe34234dfzefzfe" +``` +**Recommendation:** Load from environment variable or secure config file + +### 2. **No Authentication on Flask Routes** +All routes are publicly accessible without authentication. + +**Recommendation:** Implement Flask-Login or similar authentication + +### 3. **Insecure SSH Host Key Policy** ([libtisbackup/common.py:649](libtisbackup/common.py#L649)) +```python +ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) +``` +**Recommendation:** Use proper host key verification with known_hosts + +### 4. **Command Injection in Legacy Code** +Multiple files still use `subprocess.call(shell_string, shell=True)` and `subprocess.Popen(..., shell=True)`: +- [libtisbackup/common.py:128](libtisbackup/common.py#L128) +- [libtisbackup/common.py:883](libtisbackup/common.py#L883) +- [libtisbackup/common.py:986](libtisbackup/common.py#L986) +- [libtisbackup/backup_rsync.py:176](libtisbackup/backup_rsync.py#L176) +- [libtisbackup/backup_rsync_btrfs.py](libtisbackup/backup_rsync_btrfs.py) (multiple locations) + +**Recommendation:** Refactor to use list arguments without shell=True + +## Code Quality Issues Remaining + +1. **Global State Management** - Use Flask application context instead +2. **Wildcard imports from common** - `from libtisbackup.common import *` +3. **Configuration loaded at module level** - Should use application factory pattern +4. **Duplicated code** - `read_config()` and `read_all_configs()` share significant logic + +## Testing Recommendations + +Before deploying these changes: +1. Test USB disk detection and mounting functionality +2. Test backup export operations +3. Verify XVA backup tar validation +4. Test error handling for invalid device names +5. Verify backward compatibility with existing configurations + +## Migration Notes + +All changes are backward compatible. The `run_command()` function accepts both: +- New format: `run_command(["/bin/command", "arg1", "arg2"])` +- Legacy format: `run_command("/bin/command arg1 arg2")` (less secure, marked for deprecation) diff --git a/libtisbackup/backup_xva.py b/libtisbackup/backup_xva.py index b1c26a7..fafe0d4 100755 --- a/libtisbackup/backup_xva.py +++ b/libtisbackup/backup_xva.py @@ -25,6 +25,7 @@ import os import re import socket import ssl +import subprocess import tarfile import urllib.error import urllib.parse @@ -196,10 +197,18 @@ class backup_xva(backup_generic): session.logout() if os.path.exists(filename_temp): - tar = os.system('tar tf "%s" > /dev/null' % filename_temp) - if not tar == 0: + # Verify tar file integrity using subprocess instead of os.system + try: + subprocess.run( + ["tar", "tf", filename_temp], + capture_output=True, + check=True, + timeout=300 + ) + except (subprocess.CalledProcessError, subprocess.TimeoutExpired): os.unlink(filename_temp) return "Tar error" + if str2bool(self.verify_export): self.verify_export_xva(filename_temp) os.rename(filename_temp, filename) diff --git a/tasks.py b/tasks.py index 2af59bb..47759e4 100644 --- a/tasks.py +++ b/tasks.py @@ -1,5 +1,6 @@ import logging import os +import subprocess from huey import RedisHuey @@ -34,8 +35,12 @@ def run_export_backup(base, config_file, mount_point, backup_sections): return str(e) finally: - os.system("/bin/umount %s" % mount_point) - os.rmdir(mount_point) + # Safely unmount using subprocess instead of os.system + try: + subprocess.run(["/bin/umount", mount_point], check=True, timeout=30) + os.rmdir(mount_point) + except (subprocess.CalledProcessError, subprocess.TimeoutExpired, OSError) as e: + logger.error(f"Failed to unmount {mount_point}: {e}") return "ok" diff --git a/tisbackup_gui.py b/tisbackup_gui.py index 81c7d01..5ef295f 100755 --- a/tisbackup_gui.py +++ b/tisbackup_gui.py @@ -30,8 +30,9 @@ import glob import json import logging import re +import shutil +import subprocess import time -from shutil import * from urllib.parse import urlparse from flask import Flask, Response, abort, appcontext_pushed, flash, g, jsonify, redirect, render_template, request, session, url_for @@ -77,9 +78,10 @@ def read_all_configs(base_dir): raw_configs.append(join(base_dir, file)) for elem in raw_configs: - line = open(elem).readline() - if "global" in line: - list_config.append(elem) + with open(elem) as f: + line = f.readline() + if "global" in line: + list_config.append(elem) backup_dict = {} backup_dict["rsync_ssh_list"] = [] @@ -296,12 +298,26 @@ def backup_json(): def check_usb_disk(): """This method returns the mounts point of FIRST external disk""" - # disk_name = [] usb_disk_list = [] for name in glob.glob("/dev/sd[a-z]"): - for line in os.popen("udevadm info -q env -n %s" % name): - if re.match("ID_PATH=.*usb.*", line): - usb_disk_list += [name] + # Validate device name to prevent command injection + if not re.match(r"^/dev/sd[a-z]$", name): + continue + + try: + result = subprocess.run( + ["udevadm", "info", "-q", "env", "-n", name], + capture_output=True, + text=True, + check=True, + timeout=5 + ) + for line in result.stdout.splitlines(): + if re.match("ID_PATH=.*usb.*", line): + usb_disk_list.append(name) + break + except (subprocess.CalledProcessError, subprocess.TimeoutExpired): + continue if len(usb_disk_list) == 0: raise_error("Cannot find any external usb disk", "You should plug the usb hard drive into the server") @@ -310,14 +326,27 @@ def check_usb_disk(): usb_partition_list = [] for usb_disk in usb_disk_list: - cmd = "udevadm info -q path -n %s" % usb_disk + "1" - output = os.popen(cmd).read() - print("cmd : " + cmd) - print("output : " + output) + partition = usb_disk + "1" + # Validate partition name + if not re.match(r"^/dev/sd[a-z]1$", partition): + continue - if "/devices/pci" in output: - # flash("partition found: %s1" % usb_disk) - usb_partition_list.append(usb_disk + "1") + try: + result = subprocess.run( + ["udevadm", "info", "-q", "path", "-n", partition], + capture_output=True, + text=True, + check=True, + timeout=5 + ) + output = result.stdout + print("partition check: " + partition) + print("output : " + output) + + if "/devices/pci" in output: + usb_partition_list.append(partition) + except (subprocess.CalledProcessError, subprocess.TimeoutExpired): + continue print(usb_partition_list) @@ -330,9 +359,22 @@ def check_usb_disk(): tisbackup_partition_list = [] for usb_partition in usb_partition_list: - if "tisbackup" in os.popen("/sbin/dumpe2fs -h %s 2>&1 |/bin/grep 'volume name'" % usb_partition).read().lower(): - flash("tisbackup backup partition found: %s" % usb_partition) - tisbackup_partition_list.append(usb_partition) + # Validate partition name to prevent command injection + if not re.match(r"^/dev/sd[a-z]1$", usb_partition): + continue + + try: + result = subprocess.run( + ["/sbin/dumpe2fs", "-h", usb_partition], + capture_output=True, + text=True, + timeout=5 + ) + if "tisbackup" in result.stdout.lower(): + flash("tisbackup backup partition found: %s" % usb_partition) + tisbackup_partition_list.append(usb_partition) + except (subprocess.CalledProcessError, subprocess.TimeoutExpired): + continue print(tisbackup_partition_list) @@ -351,27 +393,64 @@ def check_usb_disk(): def check_already_mount(partition_name, refresh): + # Validate partition name to prevent path traversal + if not re.match(r"^/dev/[a-z0-9]+$", partition_name): + raise_error("Invalid partition name", "Partition name contains invalid characters") + return "" + with open("/proc/mounts") as f: mount_point = "" for line in f.readlines(): if line.startswith(partition_name): mount_point = line.split(" ")[1] - if not refresh: - run_command("/bin/umount %s" % mount_point) - os.rmdir(mount_point) + if not refresh and mount_point: + try: + subprocess.run(["/bin/umount", mount_point], check=True, timeout=30) + os.rmdir(mount_point) + except (subprocess.CalledProcessError, subprocess.TimeoutExpired, OSError) as e: + raise_error(f"Failed to unmount {mount_point}", str(e)) return mount_point -def run_command(cmd, info=""): - flash("Executing: %s" % cmd) - from subprocess import CalledProcessError, check_output +def run_command(cmd_list, info=""): + """Execute a command safely using subprocess.run with list arguments. - result = "" - try: - result = check_output(cmd, stderr=subprocess.STDOUT, shell=True) - except CalledProcessError: - raise_error(result, info) - return result + Args: + cmd_list: List of command arguments (or string for backward compatibility) + info: Additional info message on error + """ + # Handle legacy string commands by converting to list + if isinstance(cmd_list, str): + flash(f"Executing (legacy): {cmd_list}") + # This should be refactored - shell=True is unsafe + try: + result = subprocess.run( + cmd_list, + capture_output=True, + text=True, + shell=True, + timeout=30 + ) + if result.returncode != 0: + raise_error(result.stderr or result.stdout, info) + return result.stdout + except subprocess.TimeoutExpired: + raise_error("Command timeout", info) + else: + flash(f"Executing: {' '.join(cmd_list)}") + try: + result = subprocess.run( + cmd_list, + capture_output=True, + text=True, + check=True, + timeout=30 + ) + return result.stdout + except subprocess.CalledProcessError as e: + raise_error(e.stderr or e.stdout, info) + except subprocess.TimeoutExpired: + raise_error("Command timeout", info) def check_mount_disk(partition_name, refresh):