Automate MySQL database backups with a Python script that runs daily via cron and saves to cloud storage.
Keep your data safe with automated daily backups. This script creates compressed MySQL dumps and optionally uploads them to AWS S3 or Google Cloud Storage.
Run daily at 2 AM: 0 2 * * * /usr/bin/python3 /path/to/backup.py
#!/usr/bin/env python3
"""
Daily Database Backup Script
Schedule with cron: 0 2 * * * /usr/bin/python3 /path/to/backup.py
"""
import os
import subprocess
from datetime import datetime, timedelta
import smtplib
from email.mime.text import MIMEText
from dotenv import load_dotenv
import boto3 # For AWS S3 upload (optional)
load_dotenv()
# Configuration
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_USER = os.getenv("DB_USER", "root")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_NAME = os.getenv("DB_NAME")
BACKUP_DIR = os.getenv("BACKUP_DIR", "/var/backups/mysql")
RETENTION_DAYS = int(os.getenv("RETENTION_DAYS", 7))
# Email settings (for notifications)
SMTP_HOST = os.getenv("SMTP_HOST")
SMTP_PORT = int(os.getenv("SMTP_PORT", 587))
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")
ALERT_EMAIL = os.getenv("ALERT_EMAIL")
# AWS S3 settings (optional)
S3_BUCKET = os.getenv("S3_BUCKET")
S3_ENABLED = os.getenv("S3_ENABLED", "false").lower() == "true"
def send_alert(subject, message):
"""Send email alert on backup failure"""
if not all([SMTP_HOST, SMTP_USER, SMTP_PASSWORD, ALERT_EMAIL]):
print("⚠️ Email settings not configured")
return
try:
msg = MIMEText(message)
msg['Subject'] = subject
msg['From'] = SMTP_USER
msg['To'] = ALERT_EMAIL
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USER, SMTP_PASSWORD)
server.send_message(msg)
print("✅ Alert email sent")
except Exception as e:
print(f"❌ Failed to send alert: {e}")
def create_backup():
"""Create MySQL database backup"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"{BACKUP_DIR}/{DB_NAME}_{timestamp}.sql.gz"
# Create backup directory if it doesn't exist
os.makedirs(BACKUP_DIR, exist_ok=True)
# mysqldump command with compression
dump_cmd = f"mysqldump -h {DB_HOST} -u {DB_USER} -p{DB_PASSWORD} {DB_NAME} | gzip > {backup_file}"
try:
print(f"🔄 Creating backup: {backup_file}")
subprocess.run(dump_cmd, shell=True, check=True)
# Get file size
size_mb = os.path.getsize(backup_file) / (1024 * 1024)
print(f"✅ Backup created successfully ({size_mb:.2f} MB)")
return backup_file
except subprocess.CalledProcessError as e:
error_msg = f"Backup failed: {e}"
print(f"❌ {error_msg}")
send_alert("Database Backup Failed", error_msg)
return None
def upload_to_s3(file_path):
"""Upload backup to AWS S3"""
if not S3_ENABLED or not S3_BUCKET:
return
try:
s3 = boto3.client('s3')
file_name = os.path.basename(file_path)
s3_key = f"backups/{file_name}"
print(f"☁️ Uploading to S3: {S3_BUCKET}/{s3_key}")
s3.upload_file(file_path, S3_BUCKET, s3_key)
print("✅ S3 upload successful")
except Exception as e:
print(f"❌ S3 upload failed: {e}")
send_alert("S3 Upload Failed", str(e))
def cleanup_old_backups():
"""Remove backups older than retention period"""
if not os.path.exists(BACKUP_DIR):
return
cutoff_date = datetime.now() - timedelta(days=RETENTION_DAYS)
removed_count = 0
for filename in os.listdir(BACKUP_DIR):
file_path = os.path.join(BACKUP_DIR, filename)
if not os.path.isfile(file_path):
continue
file_time = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_time < cutoff_date:
try:
os.remove(file_path)
removed_count += 1
print(f"🗑️ Removed old backup: {filename}")
except Exception as e:
print(f"⚠️ Failed to remove {filename}: {e}")
if removed_count > 0:
print(f"✅ Cleaned up {removed_count} old backup(s)")
def main():
print("=" * 50)
print(f"Database Backup Started: {datetime.now()}")
print("=" * 50)
# Create backup
backup_file = create_backup()
if backup_file:
# Upload to cloud (optional)
upload_to_s3(backup_file)
# Cleanup old backups
cleanup_old_backups()
print("=" * 50)
print("✅ Backup completed successfully")
print("=" * 50)
else:
print("=" * 50)
print("❌ Backup failed")
print("=" * 50)
exit(1)
if __name__ == "__main__":
main()