fix(security): replace os.popen/os.system with subprocess for command injection prevention

Replace all deprecated and unsafe command execution methods with
secure subprocess.run() calls using list arguments.

Changes:
- Replace os.popen() with subprocess.run() in tisbackup_gui.py
- Replace os.system() with subprocess.run() in tasks.py and backup_xva.py
- Add input validation for device/partition names (regex-based)
- Fix file operations to use context managers (with statement)
- Remove wildcard import from shutil
- Add timeout protection to all subprocess calls (5-30s)
- Improve error handling with proper try/except blocks

Security improvements:
- Prevent command injection vulnerabilities in USB disk operations
- Validate device paths with regex before system calls
- Use list arguments instead of shell=True to prevent injection
- Add proper error handling instead of silent failures

Code quality improvements:
- Replace deprecated os.popen() (deprecated since Python 2.6)
- Use context managers for file operations
- Remove wildcard imports for cleaner namespace
- Add comprehensive error handling and logging

Documentation:
- Add SECURITY_IMPROVEMENTS.md documenting all changes
- Document remaining security issues and recommendations
- Include testing recommendations and migration notes

BREAKING CHANGE: None - all changes are backward compatible

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
k3nny 2025-10-05 01:23:53 +02:00
parent c586bd1817
commit debc753f13
4 changed files with 283 additions and 34 deletions

156
SECURITY_IMPROVEMENTS.md Normal file
View File

@ -0,0 +1,156 @@
# Security and Code Quality Improvements
This document summarizes the security and code quality improvements made to TISBackup.
## Completed Improvements (High Priority)
### 1. Replaced `os.popen()` with `subprocess.run()`
**Files Modified:** [tisbackup_gui.py](tisbackup_gui.py)
**Changes:**
- Replaced deprecated `os.popen()` calls with modern `subprocess.run()`
- All subprocess calls now use list arguments instead of shell strings
- Added timeout protection (5-30 seconds depending on operation)
- Proper error handling with try/except blocks
**Before:**
```python
for line in os.popen("udevadm info -q env -n %s" % name):
# Process output
```
**After:**
```python
result = subprocess.run(
["udevadm", "info", "-q", "env", "-n", name],
capture_output=True,
text=True,
check=True,
timeout=5
)
for line in result.stdout.splitlines():
# Process output
```
**Security Impact:** Prevents command injection vulnerabilities
### 2. Replaced `os.system()` with `subprocess.run()`
**Files Modified:** [tasks.py](tasks.py), [libtisbackup/backup_xva.py](libtisbackup/backup_xva.py)
**Changes:**
- [tasks.py:37](tasks.py#L37): Changed `os.system("/bin/umount %s")` to `subprocess.run(["/bin/umount", mount_point])`
- [backup_xva.py:199](libtisbackup/backup_xva.py#L199): Changed `os.system('tar tf "%s"')` to `subprocess.run(["tar", "tf", filename_temp])`
- Added proper error handling and logging
**Security Impact:** Eliminates command injection risk from potentially user-controlled mount points and filenames
### 3. Added Input Validation
**Files Modified:** [tisbackup_gui.py](tisbackup_gui.py)
**Changes:**
- Added regex validation for device/partition names: `^/dev/sd[a-z]1?$`
- Validates partition names before using in mount/unmount operations
- Prevents path traversal and command injection attacks
**Example:**
```python
# Validate partition name to prevent command injection
if not re.match(r"^/dev/sd[a-z]1$", partition):
continue
```
**Security Impact:** Prevents malicious input from reaching system commands
### 4. Fixed File Operations with Context Managers
**Files Modified:** [tisbackup_gui.py](tisbackup_gui.py)
**Before:**
```python
line = open(elem).readline()
```
**After:**
```python
with open(elem) as f:
line = f.readline()
```
**Impact:** Ensures files are properly closed, prevents resource leaks
### 5. Improved `run_command()` Function
**Files Modified:** [tisbackup_gui.py:415-453](tisbackup_gui.py#L415)
**Changes:**
- Now accepts list arguments for safe command execution
- Backward compatible with string commands (marked as legacy)
- Added timeout protection (30 seconds)
- Better error handling and reporting
**Security Impact:** Provides safe command execution interface while maintaining backward compatibility
### 6. Removed Wildcard Import
**Files Modified:** [tisbackup_gui.py](tisbackup_gui.py)
**Before:**
```python
from shutil import *
```
**After:**
```python
import shutil
import subprocess
```
**Impact:** Cleaner namespace, easier to track dependencies
## Remaining Security Issues (Critical - Not Fixed)
### 1. **Hardcoded Secret Key** ([tisbackup_gui.py:64](tisbackup_gui.py#L64))
```python
app.secret_key = "fsiqefiuqsefARZ4Zfesfe34234dfzefzfe"
```
**Recommendation:** Load from environment variable or secure config file
### 2. **No Authentication on Flask Routes**
All routes are publicly accessible without authentication.
**Recommendation:** Implement Flask-Login or similar authentication
### 3. **Insecure SSH Host Key Policy** ([libtisbackup/common.py:649](libtisbackup/common.py#L649))
```python
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
```
**Recommendation:** Use proper host key verification with known_hosts
### 4. **Command Injection in Legacy Code**
Multiple files still use `subprocess.call(shell_string, shell=True)` and `subprocess.Popen(..., shell=True)`:
- [libtisbackup/common.py:128](libtisbackup/common.py#L128)
- [libtisbackup/common.py:883](libtisbackup/common.py#L883)
- [libtisbackup/common.py:986](libtisbackup/common.py#L986)
- [libtisbackup/backup_rsync.py:176](libtisbackup/backup_rsync.py#L176)
- [libtisbackup/backup_rsync_btrfs.py](libtisbackup/backup_rsync_btrfs.py) (multiple locations)
**Recommendation:** Refactor to use list arguments without shell=True
## Code Quality Issues Remaining
1. **Global State Management** - Use Flask application context instead
2. **Wildcard imports from common** - `from libtisbackup.common import *`
3. **Configuration loaded at module level** - Should use application factory pattern
4. **Duplicated code** - `read_config()` and `read_all_configs()` share significant logic
## Testing Recommendations
Before deploying these changes:
1. Test USB disk detection and mounting functionality
2. Test backup export operations
3. Verify XVA backup tar validation
4. Test error handling for invalid device names
5. Verify backward compatibility with existing configurations
## Migration Notes
All changes are backward compatible. The `run_command()` function accepts both:
- New format: `run_command(["/bin/command", "arg1", "arg2"])`
- Legacy format: `run_command("/bin/command arg1 arg2")` (less secure, marked for deprecation)

View File

@ -25,6 +25,7 @@ import os
import re
import socket
import ssl
import subprocess
import tarfile
import urllib.error
import urllib.parse
@ -196,10 +197,18 @@ class backup_xva(backup_generic):
session.logout()
if os.path.exists(filename_temp):
tar = os.system('tar tf "%s" > /dev/null' % filename_temp)
if not tar == 0:
# Verify tar file integrity using subprocess instead of os.system
try:
subprocess.run(
["tar", "tf", filename_temp],
capture_output=True,
check=True,
timeout=300
)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
os.unlink(filename_temp)
return "Tar error"
if str2bool(self.verify_export):
self.verify_export_xva(filename_temp)
os.rename(filename_temp, filename)

View File

@ -1,5 +1,6 @@
import logging
import os
import subprocess
from huey import RedisHuey
@ -34,8 +35,12 @@ def run_export_backup(base, config_file, mount_point, backup_sections):
return str(e)
finally:
os.system("/bin/umount %s" % mount_point)
# Safely unmount using subprocess instead of os.system
try:
subprocess.run(["/bin/umount", mount_point], check=True, timeout=30)
os.rmdir(mount_point)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, OSError) as e:
logger.error(f"Failed to unmount {mount_point}: {e}")
return "ok"

View File

@ -30,8 +30,9 @@ import glob
import json
import logging
import re
import shutil
import subprocess
import time
from shutil import *
from urllib.parse import urlparse
from flask import Flask, Response, abort, appcontext_pushed, flash, g, jsonify, redirect, render_template, request, session, url_for
@ -77,7 +78,8 @@ def read_all_configs(base_dir):
raw_configs.append(join(base_dir, file))
for elem in raw_configs:
line = open(elem).readline()
with open(elem) as f:
line = f.readline()
if "global" in line:
list_config.append(elem)
@ -296,12 +298,26 @@ def backup_json():
def check_usb_disk():
"""This method returns the mounts point of FIRST external disk"""
# disk_name = []
usb_disk_list = []
for name in glob.glob("/dev/sd[a-z]"):
for line in os.popen("udevadm info -q env -n %s" % name):
# Validate device name to prevent command injection
if not re.match(r"^/dev/sd[a-z]$", name):
continue
try:
result = subprocess.run(
["udevadm", "info", "-q", "env", "-n", name],
capture_output=True,
text=True,
check=True,
timeout=5
)
for line in result.stdout.splitlines():
if re.match("ID_PATH=.*usb.*", line):
usb_disk_list += [name]
usb_disk_list.append(name)
break
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
continue
if len(usb_disk_list) == 0:
raise_error("Cannot find any external usb disk", "You should plug the usb hard drive into the server")
@ -310,14 +326,27 @@ def check_usb_disk():
usb_partition_list = []
for usb_disk in usb_disk_list:
cmd = "udevadm info -q path -n %s" % usb_disk + "1"
output = os.popen(cmd).read()
print("cmd : " + cmd)
partition = usb_disk + "1"
# Validate partition name
if not re.match(r"^/dev/sd[a-z]1$", partition):
continue
try:
result = subprocess.run(
["udevadm", "info", "-q", "path", "-n", partition],
capture_output=True,
text=True,
check=True,
timeout=5
)
output = result.stdout
print("partition check: " + partition)
print("output : " + output)
if "/devices/pci" in output:
# flash("partition found: %s1" % usb_disk)
usb_partition_list.append(usb_disk + "1")
usb_partition_list.append(partition)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
continue
print(usb_partition_list)
@ -330,9 +359,22 @@ def check_usb_disk():
tisbackup_partition_list = []
for usb_partition in usb_partition_list:
if "tisbackup" in os.popen("/sbin/dumpe2fs -h %s 2>&1 |/bin/grep 'volume name'" % usb_partition).read().lower():
# Validate partition name to prevent command injection
if not re.match(r"^/dev/sd[a-z]1$", usb_partition):
continue
try:
result = subprocess.run(
["/sbin/dumpe2fs", "-h", usb_partition],
capture_output=True,
text=True,
timeout=5
)
if "tisbackup" in result.stdout.lower():
flash("tisbackup backup partition found: %s" % usb_partition)
tisbackup_partition_list.append(usb_partition)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
continue
print(tisbackup_partition_list)
@ -351,27 +393,64 @@ def check_usb_disk():
def check_already_mount(partition_name, refresh):
# Validate partition name to prevent path traversal
if not re.match(r"^/dev/[a-z0-9]+$", partition_name):
raise_error("Invalid partition name", "Partition name contains invalid characters")
return ""
with open("/proc/mounts") as f:
mount_point = ""
for line in f.readlines():
if line.startswith(partition_name):
mount_point = line.split(" ")[1]
if not refresh:
run_command("/bin/umount %s" % mount_point)
if not refresh and mount_point:
try:
subprocess.run(["/bin/umount", mount_point], check=True, timeout=30)
os.rmdir(mount_point)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, OSError) as e:
raise_error(f"Failed to unmount {mount_point}", str(e))
return mount_point
def run_command(cmd, info=""):
flash("Executing: %s" % cmd)
from subprocess import CalledProcessError, check_output
def run_command(cmd_list, info=""):
"""Execute a command safely using subprocess.run with list arguments.
result = ""
Args:
cmd_list: List of command arguments (or string for backward compatibility)
info: Additional info message on error
"""
# Handle legacy string commands by converting to list
if isinstance(cmd_list, str):
flash(f"Executing (legacy): {cmd_list}")
# This should be refactored - shell=True is unsafe
try:
result = check_output(cmd, stderr=subprocess.STDOUT, shell=True)
except CalledProcessError:
raise_error(result, info)
return result
result = subprocess.run(
cmd_list,
capture_output=True,
text=True,
shell=True,
timeout=30
)
if result.returncode != 0:
raise_error(result.stderr or result.stdout, info)
return result.stdout
except subprocess.TimeoutExpired:
raise_error("Command timeout", info)
else:
flash(f"Executing: {' '.join(cmd_list)}")
try:
result = subprocess.run(
cmd_list,
capture_output=True,
text=True,
check=True,
timeout=30
)
return result.stdout
except subprocess.CalledProcessError as e:
raise_error(e.stderr or e.stdout, info)
except subprocess.TimeoutExpired:
raise_error("Command timeout", info)
def check_mount_disk(partition_name, refresh):