Source code for dagster_aws.s3.resources

from dagster import Field, StringSource, resource
from dagster._utils.merger import merge_dicts

from .file_manager import S3FileManager
from .utils import construct_s3_client

S3_SESSION_CONFIG = {
    "use_unsigned_session": Field(
        bool,
        description="Specifies whether to use an unsigned S3 session",
        is_required=False,
        default_value=False,
    ),
    "region_name": Field(
        str, description="Specifies a custom region for the S3 session", is_required=False
    ),
    "endpoint_url": Field(
        StringSource,
        description="Specifies a custom endpoint for the S3 session",
        is_required=False,
    ),
    "max_attempts": Field(
        int,
        description="This provides Boto3's retry handler with a value of maximum retry attempts, "
        "where the initial call counts toward the max_attempts value that you provide",
        is_required=False,
        default_value=5,
    ),
    "profile_name": Field(
        str,
        description="Specifies a profile to connect that session",
        is_required=False,
    ),
}


[docs]@resource(S3_SESSION_CONFIG) def s3_resource(context): """Resource that gives access to S3. The underlying S3 session is created by calling :py:func:`boto3.session.Session(profile_name) <boto3:boto3.session>`. The returned resource object is an S3 client, an instance of `botocore.client.S3`. Example: .. code-block:: python from dagster import build_op_context, job, op from dagster_aws.s3 import s3_resource @op(required_resource_keys={'s3'}) def example_s3_op(context): return context.resources.s3.list_objects_v2( Bucket='my-bucket', Prefix='some-key' ) @job(resource_defs={'s3': s3_resource}) def example_job(context): example_s3_op() example_job.execute_in_process( run_config={ 'resources': { 's3': { 'config': { 'region_name': 'us-west-1', } } } } ) Note that your ops must also declare that they require this resource with `required_resource_keys`, or it will not be initialized for the execution of their compute functions. You may configure this resource as follows: .. code-block:: YAML resources: s3: config: region_name: "us-west-1" # Optional[str]: Specifies a custom region for the S3 session. Default is chosen # through the ordinary boto credential chain. use_unsigned_session: false # Optional[bool]: Specifies whether to use an unsigned S3 session. Default: True endpoint_url: "http://localhost" # Optional[str]: Specifies a custom endpoint for the S3 session. Default is None. profile_name: "dev" # Optional[str]: Specifies a custom profile for S3 session. Default is default # profile as specified in ~/.aws/credentials file """ return construct_s3_client( max_attempts=context.resource_config["max_attempts"], region_name=context.resource_config.get("region_name"), endpoint_url=context.resource_config.get("endpoint_url"), use_unsigned_session=context.resource_config["use_unsigned_session"], profile_name=context.resource_config.get("profile_name"), )
[docs]@resource( merge_dicts( S3_SESSION_CONFIG, { "s3_bucket": Field(StringSource), "s3_prefix": Field(StringSource, is_required=False, default_value="dagster"), }, ) ) def s3_file_manager(context): """FileManager that provides abstract access to S3. Implements the :py:class:`~dagster._core.storage.file_manager.FileManager` API. """ return S3FileManager( s3_session=construct_s3_client( max_attempts=context.resource_config["max_attempts"], region_name=context.resource_config.get("region_name"), endpoint_url=context.resource_config.get("endpoint_url"), use_unsigned_session=context.resource_config["use_unsigned_session"], profile_name=context.resource_config.get("profile_name"), ), s3_bucket=context.resource_config["s3_bucket"], s3_base_key=context.resource_config["s3_prefix"], )