Source code for dagster_dbt.rpc.types

from typing import Any, Mapping

import requests

from ..types import DbtOutput


[docs]class DbtRpcOutput(DbtOutput): """The output from executing a dbt command via the dbt RPC server. Attributes: result (Dict[str, Any]): The parsed contents of the "result" field of the JSON response from the rpc server (if any). response_dict (Dict[str, Any]): The entire contents of the JSON response from the rpc server. response (requests.Response): The original Response from which this output was generated. """ def __init__(self, response: requests.Response): self._response = response self._response_dict = response.json() super().__init__(result=self._response_dict.get("result", {})) @property def response(self) -> requests.Response: return self._response @property def response_dict(self) -> Mapping[str, Any]: return self._response_dict