from __future__ import annotations
from typing import Optional, List, Dict, Any, Literal, TYPE_CHECKING
from datetime import datetime
from dataclasses import dataclass
from httpx import HTTPStatusError
from pyfivetran.endpoints.base import Endpoint, Client, ApiDataclass
from pyfivetran.shed import (
GeneralApiResponse,
BASE_API_URL,
API_VERSION,
ApiError,
PaginatedApiResponse,
)
if TYPE_CHECKING:
pass
[docs]@dataclass
class Connector(ApiDataclass):
fivetran_id: str
service: str
schema: str
paused: bool
sync_frequency: int
pause_after_trial: bool
group_id: str
connected_by: str
service_version: int
created_at: datetime | str
data_delay_senstivity: Literal["LOW", "NORMAL", "HIGH", "CUSTOM"] = "NORMAL"
setup_tests: Optional[List[Dict[str, Any]]] = None
source_sync_details: Optional[Dict[str, Any]] = None
data_delay_threshold: Optional[int] = 0
connect_card: Optional[Dict[str, Any]] = None
status: Optional[Dict[str, Any]] = None
daily_sync_time: Optional[str] = None
succeeded_at: Optional[datetime | str] = None
failed_at: Optional[str | datetime] = None
schedule_type: Literal["auto", "manual"] = "auto"
connect_card_config: Optional[Dict[str, Any]] = None
config: Optional[Dict[str, Any]] = None
_is_deleted: bool = False
@property
def as_url(self) -> str:
return f"{BASE_API_URL}/{API_VERSION}/connectors/{self.fivetran_id}"
@property
def raw(self) -> Dict[str, Any]:
return self._raw if hasattr(self, "_raw") else self.__dict__ # type: ignore
[docs] def modify(
self,
config: Optional[Dict[str, Any]] = None,
auth: Optional[Dict[str, Any]] = None,
paused: Optional[bool] = None,
trust_certificates: Optional[bool] = None,
trust_fingerprints: Optional[bool] = None,
daily_sync_time: Optional[str] = None,
run_setup_tests: Optional[bool] = None,
sync_frequency: Optional[int] = None,
) -> GeneralApiResponse:
if sync_frequency and sync_frequency not in [
5,
15,
30,
60,
120,
180,
360,
480,
720,
1440,
]:
raise ApiError("Invalid sync_frequency value provided") from ValueError()
payload: Dict[str, Any] = dict()
if config is not None:
payload["config"] = config
if auth is not None:
payload["auth"] = auth
if paused is not None:
payload["paused"] = paused
if trust_certificates is not None:
payload["trust_certificates"] = trust_certificates
if trust_fingerprints is not None:
payload["trust_fingerprints"] = trust_fingerprints
if daily_sync_time is not None:
payload["daily_sync_time"] = daily_sync_time
if run_setup_tests is not None:
payload["run_setup_tests"] = run_setup_tests
if sync_frequency is not None:
payload["sync_frequency"] = sync_frequency
# moving this under the resp, so we only modify if the function succeeds
resp = self.endpoint._request(
method="PATCH", url=self.as_url, json=payload
).json()
for key, value in payload.items():
if value is not None:
setattr(self, key, value)
return resp
[docs] def modify_state(self, state: Dict[str, Any]) -> GeneralApiResponse:
"""
Modifies the connector state. This endpoint is only supported for
function connectors.
:param state: The state of the connector
:return: GeneralApiResponse
"""
return self.endpoint._request(
method="PATCH", url=f"{self.as_url}/state", json=state
).json()
[docs] def delete(self) -> GeneralApiResponse:
"""
Deletes the connector.
:return: GeneralApiResponse
"""
# TODO: need to adjust this to use the endpoint attribute
try:
obj = self.endpoint.client.delete(url=self.as_url)
obj.raise_for_status()
obj_json = obj.json()
self._is_deleted = True
except HTTPStatusError as e:
if obj.status_code == 404:
obj_json = None
else:
raise e
return obj_json
[docs] def resync(
self, scope: Optional[Dict[str, List[str]]] = None
) -> GeneralApiResponse:
"""
Resyncs the connector.
:param scope: A map containing an array of tables to re-sync for each schema, must be non-empty.
:return: GeneralApiResponse
"""
return self.endpoint._request(method="POST", url=f"{self.as_url}/resync").json()
[docs] def run_setup_tests(
self,
trust_certificates: Optional[bool] = None,
trust_fingerprints: Optional[bool] = None,
) -> GeneralApiResponse:
"""
Runs setup tests on the connector.
:param trust_certificates: Whether to trust certificates
:param trust_fingerprints: Whether to trust fingerprints
:return: GeneralApiResponse
"""
payload = dict()
if trust_certificates is not None:
payload["trust_certificates"] = trust_certificates
if trust_fingerprints is not None:
payload["trust_fingerprints"] = trust_fingerprints
return self.endpoint._request(
method="POST", url=f"{self.as_url}/test", json=payload
).json()
[docs] def sync(self, force: Optional[bool] = None) -> GeneralApiResponse:
"""
Triggers a data sync for an existing connector within your Fivetran account without waiting for the next scheduled sync.
This action does not override the standard sync frequency you defined in the Fivetran dashboard.
:param force: If force is true and the connector is currently syncing, it will stop the sync and re-run it.
:return: GeneralApiResponse
"""
if not force:
force = False
payload = dict(force=force)
return self.endpoint._request(
method="POST", url=f"{self.as_url}/sync", json=payload
).json()
[docs] @classmethod
def _from_dict(cls, endpoint, d: Dict[str, Any]) -> "Connector":
"""
Helper method for deserializing from a dict
:param d: The dict to deserialize
:return: The deserialized object
"""
# TODO: change this to the utility deserialize timestamp function
# convert to datetimes
# timestamps come in the format: 2019-08-24T14:15:22Z
if d.get("succeeded_at") and isinstance(d.get("succeeded_at"), str):
d["succeeded_at"] = datetime.strptime(
d.get("succeeded_at"), "%Y-%m-%dT%H:%M:%SZ"
) # type: ignore
if d.get("created_at") and isinstance(d.get("created_at"), str):
d["created_at"] = datetime.strptime(
d.get("created_at"), "%Y-%m-%dT%H:%M:%SZ"
) # type: ignore
if d.get("failed_at") and isinstance(d.get("failed_at"), str):
d["failed_at"] = datetime.strptime(d.get("failed_at"), "%Y-%m-%dT%H:%M:%SZ") # type: ignore
cls_to_return = cls(
fivetran_id=d.get("id"), # type: ignore
service=d.get("service"), # type: ignore
schema=d.get("schema"), # type: ignore
paused=d.get("paused"), # type: ignore
status=d.get("status"),
daily_sync_time=d.get("daily_sync_time"),
succeeded_at=d.get("succeeded_at"),
connect_card=d.get("connect_card"),
sync_frequency=d.get("sync_frequency"), # type: ignore
pause_after_trial=d.get("pause_after_trial"), # type: ignore
data_delay_threshold=d.get("data_delay_threshold"),
group_id=d.get("group_id"), # type: ignore
connected_by=d.get("connected_by"), # type: ignore
setup_tests=d.get("setup_tests"),
source_sync_details=d.get("source_sync_details"),
service_version=d.get("service_version"), # type: ignore
created_at=d.get("created_at"), # type: ignore
failed_at=d.get("failed_at"),
schedule_type=d.get("schedule_type"), # type: ignore
connect_card_config=d.get("connect_card_config"),
config=d.get("config"),
_is_deleted=False,
endpoint=endpoint,
)
setattr(cls_to_return, "_raw", d)
return cls_to_return
[docs]class ConnectorEndpoint(Endpoint):
BASE_URL: str = BASE_API_URL + "/" + API_VERSION
def __init__(self, client: Client) -> None:
self.client: Client = client
super().__init__(client)
[docs] def connect_card(
self,
connector_id: str,
redirect_uri: str,
hide_setup_guide: Optional[bool] = None,
) -> GeneralApiResponse:
"""
Generates the Connect Card URI for the connector.
:param connector_id: Unique ID of the connector in Fivetran
:param redirect_uri: Redirect URI for the connector
:param hide_setup_guide: Whether to hide the setup guide
:return: GeneralApiResponse
"""
payload: Dict[str, Any] = {"redirect_uri": redirect_uri}
if hide_setup_guide is not None:
payload["hide_setup_guide"] = hide_setup_guide
return self._request(
method="POST",
url=f"{self.BASE_URL}/connectors/{connector_id}/connect-card",
json=payload,
).json()
[docs] def get_connector(self, connector_id: str) -> Connector:
"""
Gets a connector.
:param connector_id: Unique ID of the connector in Fivetran
:return: Connector
"""
return Connector._from_dict(
endpoint=self,
d=self._request(
method="GET", url=f"{self.BASE_URL}/connectors/{connector_id}"
).json()["data"],
)
# TODO: maybe move this method into the Connector class?
[docs] def get_connector_state(self, connector_id: str) -> GeneralApiResponse:
"""
Return connector state. Only supported for function connectors.
:param connector_id: Unique ID of the connector in Fivetran
:return: GeneralApiResponse
"""
return self._request(
method="GET", url=f"{self.BASE_URL}/connectors/{connector_id}/state"
).json()
[docs] def create_connector(self, connector: Connector | dict[str, Any]) -> Connector:
"""
Creates a new connector.
:param connector: Connector
:return: Connector
"""
if isinstance(connector, Connector):
connector = connector.raw
for k, v in connector.items():
if v is None:
del connector[k]
return Connector._from_dict(
endpoint=self,
d=self._request(
method="POST",
url=f"{self.BASE_URL}/connectors",
json=connector,
).json()["data"],
)