from airflow.models.baseoperator import BaseOperator
from datacat_integration.hooks import DataCatalogHook

class GetDatacatalogEntryOperator(BaseOperator):
    """This task returns the data for an entry in the datacatalog."""
    def __init__(
        self,
        datacat_type: str,
        oid : str,
        **kwargs) -> None:
        super().__init__(**kwargs)
        self.datacat_type = datacat_type
        self.oid = oid


    def execute(self, context):
        cat_conn = DataCatalogHook('datacatalog')
        return cat_conn.get_entry(self.datacat_type, self.oid)

class GetDatacatalogEntryConnectionOperator(BaseOperator):
    """This task returns the data for an entry in the datacatalog."""
    def __init__(
        self,
        datacat_type: str,
        oid : str,
        **kwargs) -> None:
        super().__init__(**kwargs)
        self.datacat_type = datacat_type
        self.oid = oid


    def execute(self, context):
        hook = DataCatalogHook("datacatalog")
        hook.create_get_entry_connection(self.datacat_type, self.oid)