Backup and Purge RDS Snapshots in AWS

Here is a script I’ve put together to backup both cluster and non clustered databases in AWS. (Still in progress).

import boto3
import datetime
import sys
#   Author: Mark Young
#   Date:   3rd October 2017
#   Detail: Automatic database snapshots are restricted to 35 days
#           This script enables us to keep snapshots for extended periods
#   Audit:  1.0 - M.Y. Original
#           1.1 - M.Y. Added retention variables and cluster name

INTERVAL_TYPE   = 'days'
INTERVAL_NUM    = 1
#RDS_NAME    = 'AURORATEST'


#for arg in sys.argv:
rds_type    = sys.argv[1].upper()
RDS_NAME    = sys.argv[2].upper()
Purge       = sys.argv[3].upper()

client = boto3.client('rds')
SNAPSHOT_NAME=datetime.datetime.now().strftime("%y-%m-%d-%H-%S")



# Backups for cluster databases
def Cluster_backup(RDS_NAME):
        print("Connecting to Cluster Database")
        print("Cluster Database snapshot backups stated... for " + RDS_NAME + ' - Snapshot ' + SNAPSHOT_NAME)
        snapshot_description = client.describe_db_cluster_snapshots(DBClusterIdentifier= RDS_NAME)
        while snapshot_description['DBClusterSnapshots'][0]['Status'] != 'available' :
            print("still waiting")
            time.sleep(15)
            snapshot_description = client.describe_db_cluster_snapshots(DBClusterIdentifier= RDS_NAME)
            response = client.create_db_cluster_snapshot(
                DBClusterSnapshotIdentifier= RDS_NAME + '-s-' + SNAPSHOT_NAME,
                DBClusterIdentifier= RDS_NAME,
                Tags=[
                    {
                        'Key': 'Backup',
                        'Value': 'MyToll'
                    },
                    ]
                    )


#For debugging
#print(response)
def Cluster_purge(RDS_NAME):
        print('Looking for items to purge - Retention ' + str(INTERVAL_NUM) + ' ' + INTERVAL_TYPE )
        for snapshot in client.describe_db_cluster_snapshots(DBClusterIdentifier= RDS_NAME, MaxRecords=50)['DBClusterSnapshots']:
            create_ts = snapshot['SnapshotCreateTime'].replace(tzinfo=None)
            if create_ts < datetime.datetime.now() - datetime.timedelta(**{INTERVAL_TYPE: INTERVAL_NUM}):
                print("Deleting snapshot id:", snapshot['DBClusterSnapshotIdentifier'])
                client.delete_db_cluster_snapshot(
                    DBClusterSnapshotIdentifier=snapshot['DBClusterSnapshotIdentifier']
                )
        #For debugging
        #print(response)
        my_response ="Completed snapshot copy and purge for cluster database..."
        return my_response

    # Backups for cluster databases
def RDS_backup(RDS_NAME):
        print("Connecting to RDS Database")
        print("Database snapshot backups stated... for " + RDS_NAME + ' - Snapshot ' + SNAPSHOT_NAME)
        response = client.create_db_snapshot(
            DBSnapshotIdentifier= RDS_NAME + '-s-' + SNAPSHOT_NAME,
            DBIdentifier= RDS_NAME,
            Tags=[
                {
                    'Key': 'Backup',
                    'Value': 'MyToll'
                },
            ]
        )

def RDS_purge(RDS_NAME):
        #For debugging
        #print(response)
        print('Looking for items to purge - Retention ' + str(INTERVAL_NUM) + ' ' + INTERVAL_TYPE )
        for snapshot in client.describe_db_snapshots(DBIdentifier= RDS_NAME, MaxRecords=50)['DBSnapshots']:
            create_ts = snapshot['SnapshotCreateTime'].replace(tzinfo=None)
            if create_ts < datetime.datetime.now() - datetime.timedelta(**{INTERVAL_TYPE: INTERVAL_NUM}):
                print("Deleting snapshot id:", snapshot['DBSnapshotIdentifier'])
                client.delete_db_cluster_snapshot(
                    DBSnapshotIdentifier=snapshot['DBSnapshotIdentifier']
                )
        #For debugging
        #print(response)
        my_response ="Completed snapshot copy and purge for RDS database..."
        return my_response

if rds_type == '-H' or rds_type == '':
    print("Usage: rds_backup.py RDS NAME: <CLUSTER/RDS> DB NAME:<RDS/CLUSTER NAME> PURGE:<PURGE/NOPURGE>")
    print:("i.e. rds_backup.py CLUSTER AURORATEST TRUE")
    sys.exit

if rds_type == 'CLUSTER':
    Cluster_backup(RDS_NAME)
    if Purge == 'PURGE':
        Cluster_purge(RDS_NAME)
if rds_type == 'RDS':
    RDS_backup(RDS_NAME)
    if Purge == 'PURGE':
        Cluster_purge(RDS_NAME)

Leave a Comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.