Module hub.core.storage.s3

Expand source code
import posixpath
from typing import Optional

import boto3
import botocore  # type: ignore
from hub.core.storage.provider import StorageProvider
from hub.util.exceptions import S3DeletionError, S3GetError, S3ListError, S3SetError


class S3Provider(StorageProvider):
    """Provider class for using S3 storage."""

    def __init__(
        self,
        root: str,
        aws_access_key_id: Optional[str] = None,
        aws_secret_access_key: Optional[str] = None,
        aws_session_token: Optional[str] = None,
        endpoint_url: Optional[str] = None,
        aws_region: Optional[str] = None,
        max_pool_connections: Optional[int] = 10,
        client=None,
    ):
        """Initializes the S3Provider

        Example:
            s3_provider = S3Provider("snark-test/benchmarks")

        Args:
            root (str): The root of the provider. All read/write request keys will be appended to root.
            aws_access_key_id (optional, str): Specifies the AWS access key used as part of the credentials to authenticate the user.
            aws_secret_access_key (optional, str): Specifies the AWS secret key used as part of the credentials to authenticate the user.
            aws_session_token (optional, str): Specifies an AWS session token used as part of the credentials to authenticate the user.
            endpoint_url (optional, str): The complete URL to use for the constructed client.
                This needs to be provided for cases in which you're interacting with MinIO, Wasabi, etc.
            aws_region (optional, str): Specifies the AWS Region to send requests to.
            max_pool_connections (optional, int): The maximum number of connections to keep in a connection pool.
                If this value is not set, the default value of 10 is used.
            client (optional): boto3.client object. If this is passed, the other arguments except root are ignored and this is used as the client while making requests.
        """
        self.aws_region = aws_region
        self.endpoint_url = endpoint_url

        root = root.replace("s3://", "")

        self.bucket = root.split("/")[0]
        self.path = "/".join(root.split("/")[1:])

        self.client_config = botocore.config.Config(
            max_pool_connections=max_pool_connections,
        )

        self.client = client or boto3.client(
            "s3",
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            aws_session_token=aws_session_token,
            config=self.client_config,
            endpoint_url=self.endpoint_url,
            region_name=self.aws_region,
        )
        self.resource = None
        if client is None:
            self.resource = boto3.resource(
                "s3",
                aws_access_key_id=aws_access_key_id,
                aws_secret_access_key=aws_secret_access_key,
                aws_session_token=aws_session_token,
                config=self.client_config,
                endpoint_url=self.endpoint_url,
                region_name=self.aws_region,
            )

    def __setitem__(self, path, content):
        """Sets the object present at the path with the value

        Args:
            path (str): the path relative to the root of the S3Provider.
            content (bytes): the value to be assigned at the path.

        Raises:
            S3SetError: Any S3 error encountered while setting the value at the path.
        """
        try:
            path = posixpath.join(self.path, path)
            content = bytearray(memoryview(content))
            self.client.put_object(
                Bucket=self.bucket,
                Body=content,
                Key=path,
                ContentType="application/octet-stream",  # signifies binary data
            )
        except Exception as err:
            raise S3SetError(err)

    def __getitem__(self, path):
        """Gets the object present at the path.

        Args:
            path (str): the path relative to the root of the S3Provider.

        Returns:
            bytes: The bytes of the object present at the path.

        Raises:
            KeyError: If an object is not found at the path.
            S3GetError: Any other error other than KeyError while retrieving the object.
        """
        try:
            path = posixpath.join(self.path, path)
            resp = self.client.get_object(
                Bucket=self.bucket,
                Key=path,
            )
            return resp["Body"].read()
        except botocore.exceptions.ClientError as err:
            if err.response["Error"]["Code"] == "NoSuchKey":
                raise KeyError(err)
            raise S3GetError(err)
        except Exception as err:
            raise S3GetError(err)

    def __delitem__(self, path):
        """Delete the object present at the path.

        Args:
            path (str): the path to the object relative to the root of the S3Provider.

        Raises:
            S3DeletionError: Any S3 error encountered while deleting the object. Note: if the object is not found, s3 won't raise KeyError.
        """
        try:
            path = posixpath.join(self.path, path)
            self.client.delete_object(Bucket=self.bucket, Key=path)
        except Exception as err:
            raise S3DeletionError(err)

    def _list_keys(self):
        """Helper function that lists all the objects present at the root of the S3Provider.

        Returns:
            list: list of all the objects found at the root of the S3Provider.

        Raises:
            S3ListError: Any S3 error encountered while listing the objects.
        """
        try:
            # TODO boto3 list_objects only returns first 1000 objects
            items = self.client.list_objects_v2(Bucket=self.bucket, Prefix=self.path)
            if items["KeyCount"] <= 0:
                return []
            items = items["Contents"]
            names = [item["Key"] for item in items]
            # removing the prefix from the names
            len_path = len(self.path.split("/"))
            names = ["/".join(name.split("/")[len_path:]) for name in names]
            return names
        except Exception as err:
            raise S3ListError(err)

    def __len__(self):
        """Returns the number of files present at the root of the S3Provider. This is an expensive operation.

        Returns:
            int: the number of files present inside the root.

        Raises:
            S3ListError: Any S3 error encountered while listing the objects.
        """
        return len(self._list_keys())

    def __iter__(self):
        """Generator function that iterates over the keys of the S3Provider.

        Yields:
            str: the name of the object that it is iterating over.
        """
        yield from self._list_keys()

    def clear(self):
        """Deletes ALL data on the s3 bucket (under self.root). Exercise caution!"""

        # much faster than mapper.clear()
        if self.resource is not None:
            bucket = self.resource.Bucket(self.bucket)
            bucket.objects.filter(Prefix=self.path).delete()
        else:
            super().clear()

Classes

class S3Provider (root: str, aws_access_key_id: Union[str, NoneType] = None, aws_secret_access_key: Union[str, NoneType] = None, aws_session_token: Union[str, NoneType] = None, endpoint_url: Union[str, NoneType] = None, aws_region: Union[str, NoneType] = None, max_pool_connections: Union[int, NoneType] = 10, client=None)

Provider class for using S3 storage.

Initializes the S3Provider

Example

s3_provider = S3Provider("snark-test/benchmarks")

Args

root : str
The root of the provider. All read/write request keys will be appended to root.
aws_access_key_id : optional, str
Specifies the AWS access key used as part of the credentials to authenticate the user.
aws_secret_access_key : optional, str
Specifies the AWS secret key used as part of the credentials to authenticate the user.
aws_session_token : optional, str
Specifies an AWS session token used as part of the credentials to authenticate the user.
endpoint_url : optional, str
The complete URL to use for the constructed client. This needs to be provided for cases in which you're interacting with MinIO, Wasabi, etc.
aws_region : optional, str
Specifies the AWS Region to send requests to.
max_pool_connections : optional, int
The maximum number of connections to keep in a connection pool. If this value is not set, the default value of 10 is used.
client : optional
boto3.client object. If this is passed, the other arguments except root are ignored and this is used as the client while making requests.
Expand source code
class S3Provider(StorageProvider):
    """Provider class for using S3 storage."""

    def __init__(
        self,
        root: str,
        aws_access_key_id: Optional[str] = None,
        aws_secret_access_key: Optional[str] = None,
        aws_session_token: Optional[str] = None,
        endpoint_url: Optional[str] = None,
        aws_region: Optional[str] = None,
        max_pool_connections: Optional[int] = 10,
        client=None,
    ):
        """Initializes the S3Provider

        Example:
            s3_provider = S3Provider("snark-test/benchmarks")

        Args:
            root (str): The root of the provider. All read/write request keys will be appended to root.
            aws_access_key_id (optional, str): Specifies the AWS access key used as part of the credentials to authenticate the user.
            aws_secret_access_key (optional, str): Specifies the AWS secret key used as part of the credentials to authenticate the user.
            aws_session_token (optional, str): Specifies an AWS session token used as part of the credentials to authenticate the user.
            endpoint_url (optional, str): The complete URL to use for the constructed client.
                This needs to be provided for cases in which you're interacting with MinIO, Wasabi, etc.
            aws_region (optional, str): Specifies the AWS Region to send requests to.
            max_pool_connections (optional, int): The maximum number of connections to keep in a connection pool.
                If this value is not set, the default value of 10 is used.
            client (optional): boto3.client object. If this is passed, the other arguments except root are ignored and this is used as the client while making requests.
        """
        self.aws_region = aws_region
        self.endpoint_url = endpoint_url

        root = root.replace("s3://", "")

        self.bucket = root.split("/")[0]
        self.path = "/".join(root.split("/")[1:])

        self.client_config = botocore.config.Config(
            max_pool_connections=max_pool_connections,
        )

        self.client = client or boto3.client(
            "s3",
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            aws_session_token=aws_session_token,
            config=self.client_config,
            endpoint_url=self.endpoint_url,
            region_name=self.aws_region,
        )
        self.resource = None
        if client is None:
            self.resource = boto3.resource(
                "s3",
                aws_access_key_id=aws_access_key_id,
                aws_secret_access_key=aws_secret_access_key,
                aws_session_token=aws_session_token,
                config=self.client_config,
                endpoint_url=self.endpoint_url,
                region_name=self.aws_region,
            )

    def __setitem__(self, path, content):
        """Sets the object present at the path with the value

        Args:
            path (str): the path relative to the root of the S3Provider.
            content (bytes): the value to be assigned at the path.

        Raises:
            S3SetError: Any S3 error encountered while setting the value at the path.
        """
        try:
            path = posixpath.join(self.path, path)
            content = bytearray(memoryview(content))
            self.client.put_object(
                Bucket=self.bucket,
                Body=content,
                Key=path,
                ContentType="application/octet-stream",  # signifies binary data
            )
        except Exception as err:
            raise S3SetError(err)

    def __getitem__(self, path):
        """Gets the object present at the path.

        Args:
            path (str): the path relative to the root of the S3Provider.

        Returns:
            bytes: The bytes of the object present at the path.

        Raises:
            KeyError: If an object is not found at the path.
            S3GetError: Any other error other than KeyError while retrieving the object.
        """
        try:
            path = posixpath.join(self.path, path)
            resp = self.client.get_object(
                Bucket=self.bucket,
                Key=path,
            )
            return resp["Body"].read()
        except botocore.exceptions.ClientError as err:
            if err.response["Error"]["Code"] == "NoSuchKey":
                raise KeyError(err)
            raise S3GetError(err)
        except Exception as err:
            raise S3GetError(err)

    def __delitem__(self, path):
        """Delete the object present at the path.

        Args:
            path (str): the path to the object relative to the root of the S3Provider.

        Raises:
            S3DeletionError: Any S3 error encountered while deleting the object. Note: if the object is not found, s3 won't raise KeyError.
        """
        try:
            path = posixpath.join(self.path, path)
            self.client.delete_object(Bucket=self.bucket, Key=path)
        except Exception as err:
            raise S3DeletionError(err)

    def _list_keys(self):
        """Helper function that lists all the objects present at the root of the S3Provider.

        Returns:
            list: list of all the objects found at the root of the S3Provider.

        Raises:
            S3ListError: Any S3 error encountered while listing the objects.
        """
        try:
            # TODO boto3 list_objects only returns first 1000 objects
            items = self.client.list_objects_v2(Bucket=self.bucket, Prefix=self.path)
            if items["KeyCount"] <= 0:
                return []
            items = items["Contents"]
            names = [item["Key"] for item in items]
            # removing the prefix from the names
            len_path = len(self.path.split("/"))
            names = ["/".join(name.split("/")[len_path:]) for name in names]
            return names
        except Exception as err:
            raise S3ListError(err)

    def __len__(self):
        """Returns the number of files present at the root of the S3Provider. This is an expensive operation.

        Returns:
            int: the number of files present inside the root.

        Raises:
            S3ListError: Any S3 error encountered while listing the objects.
        """
        return len(self._list_keys())

    def __iter__(self):
        """Generator function that iterates over the keys of the S3Provider.

        Yields:
            str: the name of the object that it is iterating over.
        """
        yield from self._list_keys()

    def clear(self):
        """Deletes ALL data on the s3 bucket (under self.root). Exercise caution!"""

        # much faster than mapper.clear()
        if self.resource is not None:
            bucket = self.resource.Bucket(self.bucket)
            bucket.objects.filter(Prefix=self.path).delete()
        else:
            super().clear()

Ancestors

  • StorageProvider
  • abc.ABC
  • collections.abc.MutableMapping
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Methods

def clear(self)

Deletes ALL data on the s3 bucket (under self.root). Exercise caution!

Expand source code
def clear(self):
    """Deletes ALL data on the s3 bucket (under self.root). Exercise caution!"""

    # much faster than mapper.clear()
    if self.resource is not None:
        bucket = self.resource.Bucket(self.bucket)
        bucket.objects.filter(Prefix=self.path).delete()
    else:
        super().clear()

Inherited members