Data Quality
Para desenvolver o desafio de negócio, vamos montar a seguinte ETL:
Fluxo
graph TD;
A[Configura Variáveis] --> B[Ler o Banco SQL];
B --> V[Validação do Schema de Entrada];
V -->|Falha| X[Alerta de Erro];
V -->|Sucesso| C[Transformar os KPIs];
C --> Y[Validação do Schema de Saída];
Y -->|Falha| Z[Alerta de Erro];
Y -->|Sucesso| D[Salvar no DuckDB];
Contrato de Dados
Bases: SchemaModel
Define o esquema para a validação de dados de produtos com Pandera.
Este esquema inclui campos básicos para produtos, incluindo um campo de e-mail
validado por uma expressão regular.
Attributes:
| Name |
Type |
Description |
id_produto |
Series[int]
|
Identificador do produto.
|
nome |
Series[str]
|
|
quantidade |
Series[int]
|
Quantidade disponível do produto, deve estar entre 1 e 2000.
|
preco |
Series[float]
|
Preço do produto, deve ser maior que 0.
|
categoria |
Series[str]
|
|
email |
Series[str]
|
E-mail associado ao produto, deve seguir o formato padrão de e-mails.
|
Source code in app\schema_crm.py
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 | class ProdutoSchema(pa.SchemaModel):
"""
Define o esquema para a validação de dados de produtos com Pandera.
Este esquema inclui campos básicos para produtos, incluindo um campo de e-mail
validado por uma expressão regular.
Attributes:
id_produto (Series[int]): Identificador do produto.
nome (Series[str]): Nome do produto.
quantidade (Series[int]): Quantidade disponível do produto, deve estar entre 1 e 2000.
preco (Series[float]): Preço do produto, deve ser maior que 0.
categoria (Series[str]): Categoria do produto.
email (Series[str]): E-mail associado ao produto, deve seguir o formato padrão de e-mails.
"""
id_produto: Series[int]
nome: Series[str]
quantidade: Series[int] = pa.Field(ge=1, le=2000)
preco: Series[float] = pa.Field(gt=0)
categoria: Series[str]
email: Series[str] = pa.Field(regex=email_regex)
class Config:
coerce = True # garantir que os tipos de dados estejam iguais ao nosso schema
strict = True # garantir que os meus dados estejam com a mesma quantidade de colunas do schema
|
Configura variáveis
Carrega as configurações a partir de variáveis de ambiente.
Source code in app\etl.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26 | def load_settings():
"""Carrega as configurações a partir de variáveis de ambiente."""
dotenv_path = Path.cwd() / '.env'
load_dotenv(dotenv_path=dotenv_path)
settings = {
"db_host": os.getenv("POSTGRES_HOST"),
"db_user": os.getenv("POSTGRES_USER"),
"db_pass": os.getenv("POSTGRES_PASSWORD"),
"db_name": os.getenv("POSTGRES_DB"),
"db_port": os.getenv("POSTGRES_PORT"),
}
return settings
|
Ler o Banco SQL
Extrai dados do banco de dados SQL usando a consulta fornecida.
Parameters:
| Name |
Type |
Description |
Default |
query |
str
|
A consulta SQL para extrair dados.
|
required
|
Returns:
| Type |
Description |
DataFrame
|
Um DataFrame do Pandas contendo os dados extraídos.
|
Source code in app\etl.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 | @pa.check_output(schema=ProdutoSchema, lazy=True) #lazy serve para validar toda a linha
def extrair_do_sql(query: str) -> pd.DataFrame:
"""
Extrai dados do banco de dados SQL usando a consulta fornecida.
Args:
query: A consulta SQL para extrair dados.
Returns:
Um DataFrame do Pandas contendo os dados extraídos.
"""
settings = load_settings()
# Criar a string de conexão com base nas configurações
connection_string = f"postgresql://{settings['db_user']}:{settings['db_pass']}@{settings['db_host']}:{settings['db_port']}/{settings['db_name']}"
# Criar engine de conexão
engine = create_engine(connection_string)
with engine.connect() as conn, conn.begin():
df_crm = pd.read_sql(query, conn)
return df_crm
|
Incluir novas Colunas de KPI's
Transforma os dados do DataFrame aplicando cálculos e normalizações.
Parameters:
| Name |
Type |
Description |
Default |
df |
DataFrame
|
DataFrame do Pandas contendo os dados originais.
|
required
|
Returns:
| Type |
Description |
DataFrame
|
DataFrame do Pandas após a aplicação das transformações.
|
Source code in app\etl.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 | @pa.check_input(ProdutoSchema, lazy=True) #lazy serve para validar toda a linha
@pa.check_output(ProductSchemaKPI, lazy=True) #lazy serve para validar toda a linha
def transformar(df: pd.DataFrame) -> pd.DataFrame:
"""
Transforma os dados do DataFrame aplicando cálculos e normalizações.
Args:
df: DataFrame do Pandas contendo os dados originais.
Returns:
DataFrame do Pandas após a aplicação das transformações.
"""
# Calcular valor_total_estoque
df['valor_total_estoque'] = df['quantidade'] * df['preco']
# Normalizar categoria para maiúsculas
df['categoria_normalizada'] = df['categoria'].str.lower()
# Determinar disponibilidade (True se quantidade > 0)
df['disponibilidade'] = df['quantidade'] > 0
return df
|
Salvando tabela no DuckDB
Carrega o DataFrame no DuckDB, criando ou substituindo a tabela especificada.
Parameters:
| Name |
Type |
Description |
Default |
df |
DataFrame
|
DataFrame do Pandas para ser carregado no DuckDB.
|
required
|
table_name |
str
|
Nome da tabela no DuckDB onde os dados serão inseridos.
|
required
|
db_file |
str
|
Caminho para o arquivo DuckDB. Se não existir, será criado.
|
'my_duckdb.db'
|
Source code in app\etl.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100 | @pa.check_input(ProductSchemaKPI, lazy=True)
def load_to_duckdb(df: pd.DataFrame, table_name: str, db_file: str = 'my_duckdb.db'):
"""
Carrega o DataFrame no DuckDB, criando ou substituindo a tabela especificada.
Args:
df: DataFrame do Pandas para ser carregado no DuckDB.
table_name: Nome da tabela no DuckDB onde os dados serão inseridos.
db_file: Caminho para o arquivo DuckDB. Se não existir, será criado.
"""
# Conectar ao DuckDB. Se o arquivo não existir, ele será criado.
con = duckdb.connect(database=db_file, read_only=False)
# Registrar o DataFrame como uma tabela temporária
con.register('df_temp', df)
# Utilizar SQL para inserir os dados da tabela temporária em uma tabela permanente
# Se a tabela já existir, substitui.
con.execute(f"CREATE OR REPLACE TABLE {table_name} AS SELECT * FROM df_temp")
# Fechar a conexão
con.close()
|