import boto3
from botocore.exceptions import ClientError
def get_bucket_region(source):
client = boto3.client('s3')
bucket_name="projectn-rpm-775902114032"
response = client.get_bucket_location(Bucket=bucket_name)
return response['LocationConstraint']
def clone_bucket(source,target):
try:
s3 = boto3.resource('s3')
srcbucket = s3.Bucket(source)
create_bucket(target,source)
destbucket = s3.Bucket(target)
for file in srcbucket.objects.all():
copy_source = {
'Bucket': source,
'Key': file.key
}
destbucket.copy(copy_source, file.key)
print(file.key +'- File Copied')
except ClientError as e:
print(e)
def delete_bucket(bucket_name):
try:
s3 = boto3.resource('s3')
s3_bucket = s3.Bucket(bucket_name)
bucket_versioning = s3.BucketVersioning(bucket_name)
if bucket_versioning.status == 'Enabled':
s3_bucket.object_versions.delete()
else:
s3_bucket.objects.all().delete()
s3_bucket.delete()
print("S3 bucket deleted successfully!")
except ClientError as e:
print(e)
def create_bucket(bucket_name,source):
s3 = boto3.client('s3')
response = s3.list_buckets()
buckets = [bucket['Name'] for bucket in response['Buckets']]
#print("Bucket List: %s" % buckets)
target=bucket_name #'my-bucket-jobid'
if target not in buckets:
print("bucket not found, so creating")
s3.create_bucket(
Bucket=target,
CreateBucketConfiguration={
'LocationConstraint': get_bucket_region(source)
})
else:
print("bucket exists, so skipping")
#delete_bucket(target)
clone_bucket("projectn-rpm-775902114032","projectn-rpm-775902114032-newrsp")
#Ref:#https://www.stackvidhya.com/copy-move-files-between-buckets-using-boto3/
#https://boto3.amazonaws.com/v1/documentation/api/1.9.42/guide/s3-example-creating-buckets.html
#https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_bucket
#https://cloudaffaire.com/faq/how-to-delete-an-s3-bucket-with-content-using-boto3/
No comments:
Post a Comment