Source code for dagster._core.definitions.partitioned_schedule

from typing import Mapping, Optional, Union, cast

import dagster._check as check

from .decorators.schedule_decorator import schedule
from .job_definition import JobDefinition
from .run_request import SkipReason
from .schedule_definition import DefaultScheduleStatus, ScheduleDefinition
from .time_window_partitions import TimeWindowPartitionsDefinition
from .unresolved_asset_job_definition import UnresolvedAssetJobDefinition


[docs]def build_schedule_from_partitioned_job( job: Union[JobDefinition, UnresolvedAssetJobDefinition], description: Optional[str] = None, name: Optional[str] = None, minute_of_hour: Optional[int] = None, hour_of_day: Optional[int] = None, day_of_week: Optional[int] = None, day_of_month: Optional[int] = None, default_status: DefaultScheduleStatus = DefaultScheduleStatus.STOPPED, tags: Optional[Mapping[str, str]] = None, ) -> ScheduleDefinition: """ Creates a schedule from a time window-partitioned job. The schedule executes at the cadence specified by the partitioning of the given job. """ check.invariant( not (day_of_week and day_of_month), "Cannot provide both day_of_month and day_of_week parameter to build_schedule_from_partitioned_job.", ) partitions_def = job.partitions_def if partitions_def is None: check.failed("The provided job is not partitioned") if not isinstance(partitions_def, TimeWindowPartitionsDefinition): check.failed( "The provided job's partitions definition is not a TimeWindowPartitionsDefinition" ) time_window_partitions_def = cast(TimeWindowPartitionsDefinition, partitions_def) cron_schedule = time_window_partitions_def.get_cron_schedule( minute_of_hour, hour_of_day, day_of_week, day_of_month ) @schedule( cron_schedule=cron_schedule, job=job, default_status=default_status, execution_timezone=time_window_partitions_def.timezone, name=check.opt_str_param(name, "name", f"{job.name}_schedule"), description=check.opt_str_param(description, "description"), ) def schedule_def(context): partition_keys = partitions_def.get_partition_keys(context.scheduled_execution_time) if len(partition_keys) == 0: return SkipReason("The job's PartitionsDefinition has no partitions") # Run for the latest partition. Prior partitions will have been handled by prior ticks. partition_key = partition_keys[-1] yield job.run_request_for_partition( partition_key=partition_key, run_key=partition_key, tags=tags ) return schedule_def
schedule_from_partitions = build_schedule_from_partitioned_job