Source code for dagster._core.event_api

from datetime import datetime
from typing import Mapping, NamedTuple, Optional, Sequence, Union

import dagster._check as check
from dagster._annotations import PublicAttr
from dagster._core.definitions.events import AssetKey
from dagster._core.errors import DagsterInvalidInvocationError
from dagster._core.events import DagsterEventType
from dagster._core.events.log import EventLogEntry
from dagster._serdes import whitelist_for_serdes


[docs]class RunShardedEventsCursor(NamedTuple): """Pairs an id-based event log cursor with a timestamp-based run cursor, for improved performance on run-sharded event log storages (e.g. the default SqliteEventLogStorage). For run-sharded storages, the id field is ignored, since they may not be unique across shards """ id: int run_updated_after: datetime
[docs]class EventLogRecord(NamedTuple): """Internal representation of an event record, as stored in a :py:class:`~dagster._core.storage.event_log.EventLogStorage`. Users should not instantiate this class directly. """ storage_id: PublicAttr[int] event_log_entry: PublicAttr[EventLogEntry] @property def run_id(self) -> str: return self.event_log_entry.run_id @property def timestamp(self) -> float: return self.event_log_entry.timestamp @property def asset_key(self) -> Optional[AssetKey]: dagster_event = self.event_log_entry.dagster_event if dagster_event: return dagster_event.asset_key return None @property def partition_key(self) -> Optional[str]: dagster_event = self.event_log_entry.dagster_event if dagster_event: return dagster_event.partition return None
[docs]@whitelist_for_serdes class EventRecordsFilter( NamedTuple( "_EventRecordsFilter", [ ("event_type", DagsterEventType), ("asset_key", Optional[AssetKey]), ("asset_partitions", Optional[Sequence[str]]), ("after_cursor", Optional[Union[int, RunShardedEventsCursor]]), ("before_cursor", Optional[Union[int, RunShardedEventsCursor]]), ("after_timestamp", Optional[float]), ("before_timestamp", Optional[float]), ("storage_ids", Optional[Sequence[int]]), ("tags", Optional[Mapping[str, Union[str, Sequence[str]]]]), ], ) ): """Defines a set of filter fields for fetching a set of event log entries or event log records. Args: event_type (DagsterEventType): Filter argument for dagster event type asset_key (Optional[AssetKey]): Asset key for which to get asset materialization event entries / records. asset_partitions (Optional[List[str]]): Filter parameter such that only asset materialization events with a partition value matching one of the provided values. Only valid when the `asset_key` parameter is provided. after_cursor (Optional[Union[int, RunShardedEventsCursor]]): Filter parameter such that only records with storage_id greater than the provided value are returned. Using a run-sharded events cursor will result in a significant performance gain when run against a SqliteEventLogStorage implementation (which is run-sharded) before_cursor (Optional[Union[int, RunShardedEventsCursor]]): Filter parameter such that records with storage_id less than the provided value are returned. Using a run-sharded events cursor will result in a significant performance gain when run against a SqliteEventLogStorage implementation (which is run-sharded) after_timestamp (Optional[float]): Filter parameter such that only event records for events with timestamp greater than the provided value are returned. before_timestamp (Optional[float]): Filter parameter such that only event records for events with timestamp less than the provided value are returned. """ def __new__( cls, event_type: DagsterEventType, asset_key: Optional[AssetKey] = None, asset_partitions: Optional[Sequence[str]] = None, after_cursor: Optional[Union[int, RunShardedEventsCursor]] = None, before_cursor: Optional[Union[int, RunShardedEventsCursor]] = None, after_timestamp: Optional[float] = None, before_timestamp: Optional[float] = None, storage_ids: Optional[Sequence[int]] = None, tags: Optional[Mapping[str, Union[str, Sequence[str]]]] = None, ): check.opt_list_param(asset_partitions, "asset_partitions", of_type=str) check.inst_param(event_type, "event_type", DagsterEventType) tags = check.opt_mapping_param(tags, "tags", key_type=str) if tags and event_type is not DagsterEventType.ASSET_MATERIALIZATION: raise DagsterInvalidInvocationError( "Can only filter by tags for asset materialization events" ) # type-ignores work around mypy type inference bug return super(EventRecordsFilter, cls).__new__( cls, event_type=event_type, asset_key=check.opt_inst_param(asset_key, "asset_key", AssetKey), asset_partitions=asset_partitions, after_cursor=check.opt_inst_param( # type: ignore after_cursor, "after_cursor", (int, RunShardedEventsCursor) ), before_cursor=check.opt_inst_param( # type: ignore before_cursor, "before_cursor", (int, RunShardedEventsCursor) ), after_timestamp=check.opt_float_param(after_timestamp, "after_timestamp"), before_timestamp=check.opt_float_param(before_timestamp, "before_timestamp"), storage_ids=check.opt_list_param(storage_ids, "storage_ids", of_type=int), tags=check.opt_mapping_param(tags, "tags", key_type=str), )