Source code for pytoolbox.aws.s3

from io import BytesIO

from botocore.exceptions import ClientError

from pytoolbox.regex import from_path_patterns


[docs]def copy_object(s3, bucket_name, source_key, target_key): return s3.copy_object( CopySource={'Bucket': bucket_name, 'Key': source_key}, Bucket=bucket_name, Key=target_key)
[docs]def get_bucket_location(s3, bucket_name): return s3.get_bucket_location(Bucket=bucket_name)['LocationConstraint']
[docs]def get_object_url(bucket_name, location, key): return f'https://s3-{location}.amazonaws.com/{bucket_name}/{key}'
[docs]def list_objects(s3, bucket_name, prefix='', patterns='*', regex=False): if prefix and prefix[-1] != '/': prefix += '/' patterns = from_path_patterns(patterns, regex=regex) for page in s3.get_paginator('list_objects').paginate(Bucket=bucket_name, Prefix=prefix): try: objects = page['Contents'] except KeyError: return for obj in objects: key = obj['Key'] if any(p.match(key) for p in patterns): yield obj
[docs]def load_object_meta(s3, bucket_name, path, fail=True): try: return s3.head_object(Bucket=bucket_name, Key=path) except ClientError as ex: if 'Not Found' in str(ex) and not fail: return None raise
[docs]def read_object(s3, bucket_name, path, file=None, fail=True): try: if file is None: with BytesIO() as f: s3.download_fileobj(bucket_name, path, f) return f.getvalue() else: s3.download_fileobj(bucket_name, path, file) file.seek(0) return file except ClientError as ex: if 'Not Found' in str(ex) and not fail: return None raise
[docs]def remove_objects( s3, bucket_name, prefix='', patterns=r'*', callback=lambda obj: True, regex=False, simulate=False ): """ Remove objects matching pattern, by chunks of 1000 to be efficient. * Set `callback` to a function returning True if given object has to be removed. """ objects = [] for obj in list_objects(s3, bucket_name, prefix, patterns, regex=regex): if callback(obj): key = obj['Key'] objects.append({'Key': key}) yield obj if len(objects) == 1000 and not simulate: s3.delete_objects(Bucket=bucket_name, Delete={'Objects': objects}) objects = [] # Remove remaining objects if objects and not simulate: s3.delete_objects(Bucket=bucket_name, Delete={'Objects': objects})
[docs]def write_object(s3, bucket_name, path, file): s3.upload_fileobj(file, bucket_name, path)