from airflow.models.baseoperator import BaseOperator from datacat_integration.hooks import DataCatalogHook class GetDatacatalogEntryOperator(BaseOperator): """This task returns the data for an entry in the datacatalog.""" def __init__( self, datacat_type: str, oid : str, **kwargs) -> None: super().__init__(**kwargs) self.datacat_type = datacat_type self.oid = oid def execute(self, context): cat_conn = DataCatalogHook('datacatalog') return cat_conn.get_entry(self.datacat_type, self.oid) class GetDatacatalogEntryConnectionOperator(BaseOperator): """This task returns the data for an entry in the datacatalog.""" def __init__( self, datacat_type: str, oid : str, **kwargs) -> None: super().__init__(**kwargs) self.datacat_type = datacat_type self.oid = oid def execute(self, context): hook = DataCatalogHook("datacatalog") hook.create_get_entry_connection(self.datacat_type, self.oid)