from __future__ import annotations
from typing import Optional, Dict, Any, Literal
from pyfivetran.endpoints.base import Endpoint, Client
from pyfivetran.shed import GeneralApiResponse, BASE_API_URL, API_VERSION, ApiError
[docs]class ConnectorSchemaEndpoint(Endpoint):
BASE_URL: str = BASE_API_URL + "/" + API_VERSION
def __init__(self, client: Client) -> None:
self.client = client
super().__init__(client)
[docs] def modify_connector_column_config(
self,
connector_id: str,
schema_name: str,
table_name: str,
column_name: str,
enabled: Optional[bool] = None,
hashed: Optional[bool] = None,
) -> GeneralApiResponse:
"""
Modifies the column configuration for a specific connector.
:param connector_id: Unique ID of the connector in Fivetran
:param schema_name: The schema name within the Fivetran system
:param table_name: The table name within the Fivetran system
:param column_name: The column name within the Fivetran system
:param enabled: Whether the column is enabled
:param hashed: Whether the column is hashed
:return: GeneralApiResponse
"""
payload = dict()
if enabled is not None:
payload["enabled"] = enabled
if hashed is not None:
payload["hashed"] = hashed
if not payload:
# Is this a normal pattern? On optional args, return an error if trying to update with nothing?
raise ApiError("No payload provided to modify_connector_column_config")
return self._request(
method="PATCH",
url=f"{self.BASE_URL}/connectors/{connector_id}/schemas/{schema_name}/tables/{table_name}/columns/{column_name}",
json=payload,
).json()
[docs] def modify_connector_database_schema_config(
self,
connector_id: str,
schema_name: str,
enabled: Optional[bool] = None,
tables: Optional[Dict[Any, Any]] = None,
) -> GeneralApiResponse:
"""
Modify connector database schema config.
:param connector_id: Unique ID of the connector in Fivetran
:param schema_name: The schema name within the Fivetran system
:param enabled: Whether the schema is enabled
:param tables: The tables within the schema
:return: GeneralApiResponse
"""
payload: Dict[Any, Any] = dict()
if enabled is not None:
payload["enabled"] = enabled
if tables is not None:
payload["tables"] = tables
if not payload:
# Is this a normal pattern? On optional args, return an error if trying to update with nothing?
raise ApiError(
"No payload provided to modify_connector_database_schema_config"
)
return self._request(
method="PATCH",
url=f"{self.BASE_URL}/connectors/{connector_id}/schemas/{schema_name}",
json=payload,
).json()
[docs] def modify_connector_schema_config(
self,
connector_id: str,
schemas: Optional[Dict[Any, Any]] = None,
schema_change_handling: Optional[
Literal["ALLOW_ALL", "ALLOW_COLUMNS", "BLOCK_ALL"]
] = None,
) -> GeneralApiResponse:
"""
Updates the schema configuration for a specific connector.
:param connector_id: Unique ID of the connector in Fivetran
:param schemas: The schemas within the Fivetran system
:param schema_change_handling: The schema change handling
:return: GeneralApiResponse
"""
if schema_change_handling not in ["ALLOW_ALL", "ALLOW_COLUMNS", "BLOCK_ALL"]:
raise ApiError(
"Invalid schema_change_handling value provided"
) from ValueError()
payload: Dict[Any, Any] = dict()
if schemas is not None:
payload["schemas"] = schemas
if schema_change_handling is not None:
payload["schema_change_handling"] = schema_change_handling
if not payload:
# Is this a normal pattern? On optional args, return an error if trying to update with nothing?
raise ApiError("No payload provided to modify_connector_schema_config")
return self._request(
method="PATCH",
url=f"{self.BASE_URL}/connectors/{connector_id}/schemas",
json=payload,
).json()
[docs] def modify_table_config(
self,
connector_id: str,
schema_name: str,
table_name: str,
enabled: Optional[bool] = None,
columns: Optional[Dict[Any, Any]] = None,
sync_mode: Optional[Literal["SOFT_DELETE", "HISTORY", "LIVE"]] = None,
) -> GeneralApiResponse:
"""
Updates the table configuration for a specific connector.
:param connector_id: Unique ID of the connector in Fivetran
:param schema_name: The schema name within the Fivetran system
:param table_name: The table name within the Fivetran system
:param enabled: Whether the table is enabled
:param columns: The columns within the table
:param sync_mode: The sync mode for the table
:return: GeneralApiResponse
"""
if sync_mode not in ["SOFT_DELETE", "HISTORY", "LIVE"]:
raise ApiError("Invalid sync_mode value provided") from ValueError()
payload: Dict[Any, Any] = dict()
if enabled is not None:
payload["enabled"] = enabled
if columns is not None:
payload["columns"] = columns
if sync_mode is not None:
payload["sync_mode"] = sync_mode
if not payload:
# Is this a normal pattern? On optional args, return an error if trying to update with nothing?
raise ApiError("No payload provided to modify_table_config")
return self._request(
method="PATCH",
url=f"{self.BASE_URL}/connectors/{connector_id}/schemas/{schema_name}/tables/{table_name}",
json=payload,
).json()
[docs] def resync_connector_table_data(
self, connector_id: str, **prop_kwargs: Optional[Dict[str, Any]]
) -> GeneralApiResponse:
"""
Triggers a historical sync of all data for multiple schema tables within a connector.
This action does not override the standard sync frequency you defined in the Fivetran dashboard.
:param connector_id: Unique ID of the connector in Fivetran
:return: GeneralApiResponse
"""
return self._request(
method="POST",
url=f"{self.BASE_URL}/connectors/{connector_id}/schemas/tables/resync",
json=prop_kwargs,
).json()
[docs] def reload_connector_schema_config(
self, connector_id: str, exclude_mode: Optional[str] = None
) -> GeneralApiResponse:
"""
Reloads the schema configuration for a specific connector.
:param connector_id: Unique ID of the connector in Fivetran
:param exclude_mode: The exclude mode
:return: GeneralApiResponse
"""
payload: Dict[Any, Any] = dict()
if exclude_mode is not None:
payload["exclude_mode"] = exclude_mode
if not payload:
# Is this a normal pattern? On optional args, return an error if trying to update with nothing?
raise ApiError("No payload provided to reload_connector_schema_config")
return self._request(
method="POST",
url=f"{self.BASE_URL}/connectors/{connector_id}/schemas/reload",
json=payload,
).json()
[docs] def get_connector_schema_config(self, connector_id: str) -> GeneralApiResponse:
"""
Get the connector schema config for an existing connector within your Fivetran account.
:param connector_id: Unique ID of the connector in Fivetran
:return: GeneralApiResponse
"""
return self._request(
method="GET", url=f"{self.BASE_URL}/connectors/{connector_id}/schemas"
).json()
[docs] def get_source_table_columns_config(
self, connector_id: str, schema: str, table: str
) -> GeneralApiResponse:
"""
Get the source table columns config for an existing connector within your Fivetran account.
:param connector_id: Unique ID of the connector in Fivetran
:param schema: The schema name within the Fivetran system
:param table: The table name within the Fivetran system
:return: GeneralApiResponse
"""
return self._request(
method="GET",
url=f"{self.BASE_URL}/connectors/{connector_id}/schemas/{schema}/tables/{table}/columns",
).json()