import boto3 from botocore.exceptions import ClientError def get_bucket_region(source): client = boto3.client('s3') bucket_name="projectn-rpm-775902114032" response = client.get_bucket_location(Bucket=bucket_name) return response['LocationConstraint'] def clone_bucket(source,target): try: s3 = boto3.resource('s3') srcbucket = s3.Bucket(source) create_bucket(target,source) destbucket = s3.Bucket(target) for file in srcbucket.objects.all(): copy_source = { 'Bucket': source, 'Key': file.key } destbucket.copy(copy_source, file.key) print(file.key +'- File Copied') except ClientError as e: print(e) def delete_bucket(bucket_name): try: s3 = boto3.resource('s3') s3_bucket = s3.Bucket(bucket_name) bucket_versioning = s3.BucketVersioning(bucket_name) if bucket_versioning.status == 'Enabled': s3_bucket.object_versions.delete() else: s3_bucket.objects.all().delete() s3_bucket.delete() print("S3 bucket deleted successfully!") except ClientError as e: print(e) def create_bucket(bucket_name,source): s3 = boto3.client('s3') response = s3.list_buckets() buckets = [bucket['Name'] for bucket in response['Buckets']] #print("Bucket List: %s" % buckets) target=bucket_name #'my-bucket-jobid' if target not in buckets: print("bucket not found, so creating") s3.create_bucket( Bucket=target, CreateBucketConfiguration={ 'LocationConstraint': get_bucket_region(source) }) else: print("bucket exists, so skipping") #delete_bucket(target) clone_bucket("projectn-rpm-775902114032","projectn-rpm-775902114032-newrsp") #Ref:
#https://www.stackvidhya.com/copy-move-files-between-buckets-using-boto3/ #https://boto3.amazonaws.com/v1/documentation/api/1.9.42/guide/s3-example-creating-buckets.html #https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_bucket #https://cloudaffaire.com/faq/how-to-delete-an-s3-bucket-with-content-using-boto3/
No comments:
Post a Comment