from __future__ import annotations
from typing import Optional, List
from pyfivetran.endpoints.base import Endpoint, Client
from pyfivetran.shed import (
GeneralApiResponse,
BASE_API_URL,
API_VERSION,
PaginatedApiResponse,
)
[docs]class CertificateEndpoint(Endpoint):
BASE_URL: str = BASE_API_URL + "/" + API_VERSION
def __init__(self, client: Client) -> None:
self.client = client
super().__init__(client)
[docs] def approve_certificate(
self,
hash: str,
encoded_cert: str | bytes,
destination_id: Optional[str] = None,
connector_id: Optional[str] = None,
) -> GeneralApiResponse:
"""
Approves a certificate for a connector/destination, so Fivetran trusts this certificate for a source/destination database.
The connector/destination setup tests will fail if a non-approved certificate is provided.
:param hash: Hash of the certificate
:param encoded_cert: The certificate encoded in base64, as a string or bytes
:param destination_id: Unique ID of the destination in Fivetran
:param connector_id: Unique ID of the connector in Fivetran
:return: GeneralApiResponse
"""
if isinstance(encoded_cert, bytes):
# just have to serialize it as a string for the API
encoded_cert = str(encoded_cert)
payload = {"hash": hash, "encoded_cert": encoded_cert}
if destination_id:
payload["destination_id"] = destination_id
if connector_id:
payload["connector_id"] = connector_id
return self._request(
method="POST", url=f"{self.BASE_URL}/certificates", json=payload
).json()
[docs] def approve_fingerprint(
self,
hash: str,
public_key: str | bytes,
connector_id: Optional[str] = None,
destination_id: Optional[str] = None,
) -> GeneralApiResponse:
"""
Approves a fingerprint for a connector/destination, so Fivetran trusts this fingerprint for a source/destination database.
The connector/destination setup tests will fail if a non-approved fingerprint is provided.
:param hash: Hash of the fingerprint
:param public_key: The fingerprint encoded in base64, as a string or bytes
:param destination_id: Unique ID of the destination in Fivetran
:param connector_id: Unique ID of the connector in Fivetran
:return: GeneralApiResponse
"""
if isinstance(public_key, bytes):
# just have to serialize it as a string for the API
public_key = str(public_key)
payload = {"hash": hash, "public_key": public_key}
if connector_id:
endpoint_ = f"/connectors/{connector_id}/fingerprints"
elif destination_id:
endpoint_ = f"/destinations/{destination_id}/fingerprints"
else:
raise ValueError("Either connector_id or destination_id must be provided")
return self._request(
method="POST", url=f"{self.BASE_URL}{endpoint_}", json=payload
).json()
[docs] def get_certificate_details(
self,
hash: str | bytes,
connector_id: Optional[str] = None,
destination_id: Optional[str] = None,
) -> GeneralApiResponse:
"""
Get certificate details.
:param hash: Hash of the certificate
:param destination_id: Unique ID of the destination in Fivetran
:param connector_id: Unique ID of the connector in Fivetran
:return: GeneralApiResponse
"""
if isinstance(hash, bytes):
# just have to serialize it as a string for the API
hash = str(hash)
if connector_id:
endpoint_ = f"/connectors/{connector_id}/certificates/{hash}"
elif destination_id:
endpoint_ = f"/destinations/{destination_id}/certificates/{hash}"
else:
raise ValueError("Either connector_id or destination_id must be provided")
return self._request(method="GET", url=f"{self.BASE_URL}{endpoint_}").json()
[docs] def get_fingerprint_details(
self,
hash: str | bytes,
connector_id: Optional[str] = None,
destination_id: Optional[str] = None,
) -> GeneralApiResponse:
"""
Get fingerprint details.
:param hash: Hash of the fingerprint
:param destination_id: Unique ID of the destination in Fivetran
:param connector_id: Unique ID of the connector in Fivetran
:return: GeneralApiResponse
"""
if isinstance(hash, bytes):
# just have to serialize it as a string for the API
hash = str(hash)
if connector_id:
endpoint_ = f"/connectors/{connector_id}/fingerprints/{hash}"
elif destination_id:
endpoint_ = f"/destinations/{destination_id}/fingerprints/{hash}"
else:
raise ValueError("Either connector_id or destination_id must be provided")
return self._request(method="GET", url=f"{self.BASE_URL}{endpoint_}").json()
[docs] def get_fingerprints(
self,
connector_id: Optional[str] = None,
destination_id: Optional[str] = None,
limit: Optional[int] = None,
) -> List[PaginatedApiResponse]:
"""
Get all fingerprints for a connector or destination.
:param connector_id: Unique ID of the connector in Fivetran
:param destination_id: Unique ID of the destination in Fivetran
:param limit: Number of results to return
:return: List[PaginatedApiResponse]
"""
if connector_id:
endpoint_ = f"/connectors/{connector_id}/fingerprints"
elif destination_id:
endpoint_ = f"/destinations/{destination_id}/fingerprints"
else:
raise ValueError("Either connector_id or destination_id must be provided")
if not limit:
limit = 100
params = {"limit": limit}
first_response = self._request(
method="GET", url=f"{self.BASE_URL}{endpoint_}", params=params
)
resp = self._paginate(
first_response=first_response, endpoint=endpoint_, limit=limit
)
return list(map(lambda x: x.json(), resp))
[docs] def get_certificates(
self,
connector_id: Optional[str] = None,
destination_id: Optional[str] = None,
limit: Optional[int] = None,
) -> List[PaginatedApiResponse]:
"""
Get all certificates for a connector or destination.
:param connector_id: Unique ID of the connector in Fivetran
:param destination_id: Unique ID of the destination in Fivetran
:param limit: Number of results to return
:return: List[PaginatedApiResponse]
"""
if connector_id:
endpoint_ = f"/connectors/{connector_id}/certificates"
elif destination_id:
endpoint_ = f"/destinations/{destination_id}/certificates"
else:
raise ValueError("Either connector_id or destination_id must be provided")
if not limit:
limit = 100
params = {"limit": limit}
first_response = self._request(
method="GET", url=f"{self.BASE_URL}{endpoint_}", params=params
)
resp = self._paginate(
first_response=first_response,
endpoint=f"{self.BASE_URL}{endpoint_}",
limit=limit,
)
return list(map(lambda x: x.json(), resp))
[docs] def revoke_certificate(
self,
hash: str | bytes,
connector_id: Optional[str] = None,
destination_id: Optional[str] = None,
) -> GeneralApiResponse:
"""
Revoke a certificate.
:param hash: Hash of the certificate
:param destination_id: Unique ID of the destination in Fivetran
:param connector_id: Unique ID of the connector in Fivetran
:return: GeneralApiResponse
"""
if isinstance(hash, bytes):
# just have to serialize it as a string for the API
hash = str(hash)
if connector_id:
endpoint_ = f"/connectors/{connector_id}/certificates/{hash}"
elif destination_id:
endpoint_ = f"/destinations/{destination_id}/certificates/{hash}"
else:
raise ValueError("Either connector_id or destination_id must be provided")
return self._request(method="DELETE", url=f"{self.BASE_URL}{endpoint_}").json()
[docs] def revoke_fingerprint(
self,
hash: str | bytes,
connector_id: Optional[str] = None,
destination_id: Optional[str] = None,
) -> GeneralApiResponse:
"""
Revoke a fingerprint.
:param hash: Hash of the fingerprint
:param destination_id: Unique ID of the destination in Fivetran
:param connector_id: Unique ID of the connector in Fivetran
:return: GeneralApiResponse
"""
if isinstance(hash, bytes):
# just have to serialize it as a string for the API
hash = str(hash)
if connector_id:
endpoint_ = f"/connectors/{connector_id}/fingerprints/{hash}"
elif destination_id:
endpoint_ = f"/destinations/{destination_id}/fingerprints/{hash}"
else:
raise ValueError("Either connector_id or destination_id must be provided")
return self._request(method="DELETE", url=f"{self.BASE_URL}{endpoint_}").json()