from __future__ import annotations
from typing import Optional, List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
from pyfivetran.endpoints.base import Endpoint, Client, ApiDataclass
from pyfivetran.shed import (
GeneralApiResponse,
BASE_API_URL,
API_VERSION,
PaginatedApiResponse,
)
from pyfivetran.utils import deserialize_timestamp
[docs]@dataclass
class Group(ApiDataclass):
fivetran_id: str
name: str
created_at: datetime
_is_deleted: bool = False
@property
def as_url(self) -> str:
return f"{BASE_API_URL}/{API_VERSION}/groups/{self.fivetran_id}"
@property
def raw(self) -> Dict[str, Any]:
return self._raw if hasattr(self, "_raw") else self.__dict__
@property
def public_key(self) -> str:
"""
Public key from SSH key pair.
"""
if not hasattr(self, "_public_key"):
resp: GeneralApiResponse = self.endpoint._request(
method="GET", url=f"{self.as_url}/public-key"
).json()
self._public_key: str = resp["data"]["public_key"] # type: ignore
return self._public_key
@property
def service_account(self) -> str:
"""
Fivetran service account associated with the group.
"""
if not hasattr(self, "_service_account"):
resp: GeneralApiResponse = self.endpoint._request(
method="GET", url=f"{self.as_url}/service-account"
).json()
self._service_account: str = resp["data"]["service_account"] # type: ignore
return self._service_account
[docs] @classmethod
def _from_dict(cls, endpoint, d: Dict[str, Any]) -> "Group":
"""
Helper method for deserializing from a dict
:param d: The dict to deserialize
:return: The deserialized object
"""
cls_to_return = cls(
endpoint=endpoint,
fivetran_id=d["id"],
name=d["name"],
created_at=deserialize_timestamp(d["created_at"]),
)
setattr(cls_to_return, "_raw", d)
return cls_to_return
[docs] def delete(self) -> GeneralApiResponse:
"""
Deletes a group in your Fivetran account.
:return: GeneralApiResponse
"""
return self.endpoint._request(method="DELETE", url=self.as_url).json()
[docs] def modify(self, name: str) -> GeneralApiResponse:
"""
Modifies a group in your Fivetran account.
:param name: The name of the group
:return: GeneralApiResponse
"""
payload = dict(name=name)
return self.endpoint._request(
method="PATCH", url=self.as_url, json=payload
).json()
[docs] def add_user(
self, email: Optional[str] = None, role: Optional[str] = None
) -> GeneralApiResponse:
"""
Adds an existing user to a group in your Fivetran account.
:param email: The email of the user to add
:param role: The role of the user to add
:return: GeneralApiResponse
"""
payload = dict()
if not email and not role:
raise ValueError("Either email or role must be provided")
if email is not None:
payload["email"] = email
if role is not None:
payload["role"] = role
return self.endpoint._request(
method="POST", url=f"{self.as_url}/users", json=payload
).json()
# TODO: change this to serialize to the Connector dataclass
[docs] def list_connectors(
self, schema: Optional[str] = None, limit: Optional[int] = None
) -> List[PaginatedApiResponse]:
"""
Returns a list of connectors in a group in your Fivetran account.
:param schema: The schema of the connector
:param limit: The number of records to return
:return: List[PaginatedApiResponse]
"""
params: Dict[str, str | int] = dict()
if schema is not None:
params["schema"] = schema
if limit is not None:
params["limit"] = limit
resp_list = self.endpoint._paginate(
first_response=self.endpoint._request(
method="GET", url=f"{self.as_url}/connectors", params=params
),
endpoint=f"{self.as_url}/connectors",
limit=limit,
)
return list(map(lambda x: x.json(), resp_list))
# TODO: serialize to the User dataclass
[docs] def list_users(self, limit: Optional[int] = None) -> List[PaginatedApiResponse]:
"""
Returns a list of users in a group in your Fivetran account.
:param limit: The number of records to return
:return: List[PaginatedApiResponse]
"""
params = dict()
if limit is not None:
params["limit"] = limit
resp_list = self.endpoint._paginate(
first_response=self.endpoint._request(
method="GET", url=f"{self.as_url}/users", params=params
),
endpoint=f"{self.as_url}/users",
limit=limit,
)
return list(map(lambda x: x.json(), resp_list))
# TODO: serialize to User dataclass
[docs] def remove_user(self, user_id: str) -> GeneralApiResponse:
"""
Removes a user from a group in your Fivetran account.
:param user_id: The ID of the user to remove
:return: GeneralApiResponse
"""
return self.endpoint._request(
method="DELETE", url=f"{self.as_url}/users/{user_id}"
).json()
[docs]class GroupEndpoint(Endpoint):
BASE_URL: str = BASE_API_URL + "/" + API_VERSION
def __init__(self, client: Client) -> None:
self.client: Client = client
super().__init__(client)
[docs] def create_group(self, name: str) -> Group:
"""
Creates a new group in your fivetran account.
:param name: The name of the group
:return: Group
"""
payload = dict(name=name)
response: GeneralApiResponse = self._request(
method="POST", url=f"{self.BASE_URL}/groups", json=payload
).json()
return Group._from_dict(self, response["data"]) # type: ignore
[docs] def list_groups(self, limit: Optional[int] = None) -> List[Group]:
"""
Returns a list of groups in your Fivetran account.
:param limit: The number of records to return
:return: List[PaginatedApiResponse]
"""
params = dict()
if limit is not None:
params["limit"] = limit
resp_list = self._paginate(
first_response=self._request(
method="GET", url=f"{self.BASE_URL}/groups", params=params
),
endpoint=f"{self.BASE_URL}/groups",
limit=limit,
)
jsons: List[PaginatedApiResponse] = list(map(lambda x: x.json(), resp_list))
group_jsons: List[Dict[str, Any]] = []
for json in jsons:
group_jsons.extend(json.get("data").get("items")) # type: ignore
return list(map(lambda x: Group._from_dict(self, x), group_jsons))
[docs] def get_group(self, group_id: str) -> Group:
"""
Gets a group in your Fivetran account.
:param group_id: The ID of the group
:return: Group
"""
resp: GeneralApiResponse = self._request(
method="GET", url=f"{self.BASE_URL}/groups/{group_id}"
).json()
return Group._from_dict(
self,
resp["data"], # type: ignore
)