-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #12 from dqops/0.4.0
Release 0.4.0
- Loading branch information
Showing
1,047 changed files
with
481,669 additions
and
284,888 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,27 @@ | ||
# 0.4.0 | ||
* Data quality dashboards reorganized, layout unified | ||
* Python client and REST API documentation | ||
* Additional Airflow operators | ||
* Small bug fixes in the UI | ||
* DQOps concepts documentation | ||
|
||
# 0.3.0 | ||
* Improvements for running DQOps as a cloud hosted SaaS platform | ||
* Reorganized data quality dashboard tree to match the structure of checks | ||
* UI screens for managing the configuration: users, default schedules, notifications and more | ||
* Airflow operator for running checks | ||
|
||
# 0.2.1 | ||
* Table comparison support | ||
* Anomaly detection checks renamed | ||
* Bug fixes | ||
* Bug fixes | ||
|
||
# 0.2.0 | ||
* Reorganization of all data quality checks | ||
* User interface created | ||
* Documentation created for all checks | ||
* Extensive REST API for all operations | ||
* Additional connectors for PostgreSQL, MySQL, SQL Server, Oracle | ||
|
||
# 0.1.0 | ||
* Initial command-line version (preview release) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
0.3.0 | ||
0.4.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
142 changes: 142 additions & 0 deletions
142
distribution/python/dqops/airflow/collect_statistics/dqops_collect_statistics_operator.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
import json | ||
import logging | ||
from typing import Any, Dict, List, Union | ||
|
||
from airflow.models.baseoperator import BaseOperator | ||
from httpx import ReadTimeout | ||
|
||
from dqops.airflow.common.exceptions.dqops_job_failed_exception import ( | ||
DqopsJobFailedException, | ||
) | ||
from dqops.airflow.common.tools.client_creator import create_client | ||
from dqops.airflow.common.tools.server_response_verifier import ( | ||
verify_server_response_correctness, | ||
) | ||
from dqops.airflow.common.tools.timeout.dqo_timeout import handle_dqo_timeout | ||
from dqops.airflow.common.tools.timeout.python_client_timeout import ( | ||
handle_python_timeout, | ||
) | ||
from dqops.airflow.common.tools.url_resolver import extract_base_url | ||
from dqops.client import Client | ||
from dqops.client.api.jobs.collect_statistics_on_table import sync_detailed | ||
from dqops.client.models.collect_statistics_queue_job_result import ( | ||
CollectStatisticsQueueJobResult, | ||
) | ||
from dqops.client.models.dqo_job_status import DqoJobStatus | ||
from dqops.client.models.statistics_collector_search_filters import ( | ||
StatisticsCollectorSearchFilters, | ||
) | ||
from dqops.client.models.statistics_collector_target import StatisticsCollectorTarget | ||
from dqops.client.types import UNSET, Response, Unset | ||
|
||
|
||
class DqopsCollectStatisticsOperator(BaseOperator): | ||
""" | ||
Airflow collect statistics operator for receiving DQOps table status. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
*, | ||
connection: (Union[Unset, str]) = UNSET, | ||
full_table_name: Union[Unset, str] = UNSET, | ||
enabled: Union[Unset, bool] = UNSET, | ||
labels: Union[Unset, List[str]] = UNSET, | ||
column_names: Union[Unset, List[str]] = UNSET, | ||
sensor_name: Union[Unset, str] = UNSET, | ||
target: Union[Unset, StatisticsCollectorTarget] = UNSET, | ||
base_url: str = "http://localhost:8888/", | ||
job_business_key: Union[Unset, None, str] = UNSET, | ||
wait_timeout: Union[Unset, None, int] = UNSET, | ||
fail_on_timeout: bool = True, | ||
**kwargs | ||
) -> Union[Dict[str, Any], None]: | ||
""" | ||
All parameters are optional. When not set, all statistics will be collected | ||
Parameters | ||
---------- | ||
connection : Union[Unset, str] | ||
The connection name to the data source in DQOps. | ||
full_table_name : Union[Unset, str] | ||
The schema name with the table name. | ||
enabled : Union[Unset, bool] | ||
If set to true only enabled connections and tables are filtered. Otherwise only disabled connection or table are used. | ||
labels: Union[Unset, List[str]] = UNSET | ||
The label names of those edited by user on connections, tables and columns edited in DQOps platform. | ||
column_names : Union[Unset, List[str]] = UNSET | ||
The names of columns. | ||
sensor_name : Union[Unset, str] = UNSET | ||
The name of the sensor | ||
target : Union[Unset, StatisticsCollectorTarget] = UNSET | ||
The name of the target which value is column or table. | ||
base_url : str [optional, default="http://localhost:8888/"] | ||
The base url to DQOps application. | ||
job_business_key : Union[Unset, None, str] = UNSET | ||
Job business key that is a user assigned unique job id, used to check the job status by looking up the job by a user assigned identifier, instead of the DQOps assigned job identifier. | ||
wait_timeout : int | ||
Time in seconds for execution that client will wait. It prevents from hanging the task for an action that is never completed. If not set, the timeout is read form the client defaults, which value is 120 seconds. | ||
fail_on_timeout : bool [optional, default=True] | ||
Timeout is leading the task status to Failed by default. It can be omitted marking the task as Success by setting the flag to True. | ||
""" | ||
|
||
super().__init__(**kwargs) | ||
self.connection: Union[Unset, str] = connection | ||
self.full_table_name: Union[Unset, str] = full_table_name | ||
self.enabled: Union[Unset, bool] = enabled | ||
self.labels: Union[Unset, List[str]] = labels | ||
self.column_names: Union[Unset, List[str]] = column_names | ||
self.sensor_name: Union[Unset, str] = sensor_name | ||
self.target: Union[Unset, StatisticsCollectorTarget] = target | ||
|
||
self.base_url: str = extract_base_url(base_url) | ||
self.job_business_key: Union[Unset, None, str] = job_business_key | ||
self.wait_timeout: int = wait_timeout | ||
self.fail_on_timeout: bool = fail_on_timeout | ||
|
||
def execute(self, context): | ||
client: Client = create_client( | ||
base_url=self.base_url, wait_timeout=self.wait_timeout | ||
) | ||
|
||
try: | ||
search_filters: StatisticsCollectorSearchFilters = ( | ||
StatisticsCollectorSearchFilters( | ||
connection=self.connection, | ||
full_table_name=self.full_table_name, | ||
enabled=self.enabled, | ||
labels=self.labels, | ||
column_names=self.column_names, | ||
sensor_name=self.sensor_name, | ||
target=self.target, | ||
) | ||
) | ||
|
||
response: Response[CollectStatisticsQueueJobResult] = sync_detailed( | ||
client=client, | ||
json_body=search_filters, | ||
job_business_key=self.job_business_key, | ||
wait=True, | ||
wait_timeout=self.wait_timeout, | ||
) | ||
except ReadTimeout as exception: | ||
handle_python_timeout(exception, self.fail_on_timeout) | ||
return None | ||
|
||
verify_server_response_correctness(response) | ||
|
||
job_result: CollectStatisticsQueueJobResult = ( | ||
CollectStatisticsQueueJobResult.from_dict( | ||
json.loads(response.content.decode("utf-8")) | ||
) | ||
) | ||
logging.info(job_result.to_dict()) | ||
|
||
if job_result.status == DqoJobStatus.FAILED: | ||
raise DqopsJobFailedException(context["ti"], job_result.to_dict()) | ||
|
||
if job_result.status == DqoJobStatus.RUNNING: | ||
handle_dqo_timeout(self.fail_on_timeout) | ||
|
||
return job_result.to_dict() |
14 changes: 14 additions & 0 deletions
14
...ion/python/dqops/airflow/common/exceptions/dqops_data_quality_issue_detected_exception.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
from airflow.exceptions import AirflowException | ||
from airflow.models.taskinstance import TaskInstance | ||
|
||
class DqopsDataQualityIssueDetectedException(AirflowException): | ||
""" | ||
Exception used in airflow to mark status of task execution as Failed. | ||
The exception informs that DQO has detected a data quality issue. | ||
""" | ||
|
||
def __init__(self, ti: TaskInstance, return_value: dict): | ||
error_message: str = "DQOps has detected a data quality issue!" | ||
super().__init__(error_message) | ||
ti.xcom_push("return_value", return_value) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
13 changes: 13 additions & 0 deletions
13
distribution/python/dqops/airflow/common/exceptions/dqops_internal_server_error_exception.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
from airflow.exceptions import AirflowException | ||
|
||
|
||
class DqopsInternalServerErrorException(AirflowException): | ||
""" | ||
Exception used in airflow to mark status of task execution as Failed. | ||
The exception is thrown on internal server error from DQOps server API. | ||
""" | ||
|
||
def __init__(self): | ||
error_message: str = "DQOps server responded with Internal Server Error!" | ||
super().__init__(error_message) |
14 changes: 14 additions & 0 deletions
14
distribution/python/dqops/airflow/common/exceptions/dqops_job_failed_exception.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
from airflow.exceptions import AirflowException | ||
from airflow.models.taskinstance import TaskInstance | ||
|
||
class DqopsJobFailedException(AirflowException): | ||
""" | ||
Exception used in airflow to mark status of task execution as Failed. | ||
The exception informs that a DQOps' job has failed. | ||
""" | ||
|
||
def __init__(self, ti: TaskInstance, return_value: dict): | ||
error_message: str = "DQOps job has failed!" | ||
super().__init__(error_message) | ||
ti.xcom_push("return_value", return_value) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
13 changes: 13 additions & 0 deletions
13
distribution/python/dqops/airflow/common/exceptions/dqops_unfinished_job_exception.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
from airflow.exceptions import AirflowException | ||
|
||
|
||
class DqopsUnfinishedJobException(AirflowException): | ||
""" | ||
Exception used in airflow to mark status of task execution as Failed. | ||
The exception informs that wait for job action has not completed yet. | ||
""" | ||
|
||
def __init__(self): | ||
error_message: str = "Job has not completed yet." | ||
super().__init__(error_message) |
29 changes: 29 additions & 0 deletions
29
distribution/python/dqops/airflow/common/tools/client_creator.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
from typing import Union | ||
|
||
from httpx import Timeout | ||
|
||
from dqops.client import Client | ||
from dqops.client.types import UNSET, Unset | ||
|
||
# extra time for python client to wait for dqo after it times out | ||
EXTRA_TIMEOUT_SECONDS: int = 1 | ||
|
||
|
||
def create_client(base_url: str, *, wait_timeout: Union[int, Unset] = Unset) -> Client: | ||
"""Creates python client for airflow operators. | ||
Parameters | ||
---------- | ||
base_url : str | ||
The base url to DQOps application. Default value is http://localhost:8888/ | ||
wait_timeout : int | ||
Time in seconds for execution that client will wait. It prevents from hanging the task for an action that is never completed. If not set, the timeout is read form the client defaults, which value is 120 seconds. | ||
Returns | ||
DQOps client object. | ||
""" | ||
client: Client = Client(base_url=base_url) | ||
if wait_timeout is not UNSET: | ||
client.with_timeout(Timeout(wait_timeout + EXTRA_TIMEOUT_SECONDS)) | ||
|
||
return client |
Oops, something went wrong.