Skip to content

Data Quality

Para desenvolver o desafio de negócio, vamos montar a seguinte ETL:

Fluxo

graph TD; A[Configura Variáveis] --> B[Ler o Banco SQL]; B --> V[Validação do Schema de Entrada]; V -->|Falha| X[Alerta de Erro]; V -->|Sucesso| C[Transformar os KPIs]; C --> Y[Validação do Schema de Saída]; Y -->|Falha| Z[Alerta de Erro]; Y -->|Sucesso| D[Salvar no DuckDB];

Contrato de Dados

Bases: SchemaModel

Define o esquema para a validação de dados de produtos com Pandera.

Este esquema inclui campos básicos para produtos, incluindo um campo de e-mail validado por uma expressão regular.

Attributes:

Name Type Description
id_produto Series[int]

Identificador do produto.

nome Series[str]

Nome do produto.

quantidade Series[int]

Quantidade disponível do produto, deve estar entre 1 e 2000.

preco Series[float]

Preço do produto, deve ser maior que 0.

categoria Series[str]

Categoria do produto.

email Series[str]

E-mail associado ao produto, deve seguir o formato padrão de e-mails.

Source code in app\schema_crm.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class ProdutoSchema(pa.SchemaModel):
    """
    Define o esquema para a validação de dados de produtos com Pandera.

    Este esquema inclui campos básicos para produtos, incluindo um campo de e-mail
    validado por uma expressão regular.

    Attributes:
        id_produto (Series[int]): Identificador do produto.
        nome (Series[str]): Nome do produto.
        quantidade (Series[int]): Quantidade disponível do produto, deve estar entre 1 e 2000.
        preco (Series[float]): Preço do produto, deve ser maior que 0.
        categoria (Series[str]): Categoria do produto.
        email (Series[str]): E-mail associado ao produto, deve seguir o formato padrão de e-mails.
    """
    id_produto: Series[int]
    nome: Series[str]
    quantidade: Series[int] = pa.Field(ge=1, le=2000)
    preco: Series[float] = pa.Field(gt=0)
    categoria: Series[str]
    email: Series[str] = pa.Field(regex=email_regex)

    class Config:
        coerce = True # garantir que os tipos de dados estejam iguais ao nosso schema
        strict = True # garantir que os meus dados estejam com a mesma quantidade de colunas do schema

Transformações

Configura variáveis

Carrega as configurações a partir de variáveis de ambiente.

Source code in app\etl.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def load_settings():
    """Carrega as configurações a partir de variáveis de ambiente."""
    dotenv_path = Path.cwd() / '.env'
    load_dotenv(dotenv_path=dotenv_path)

    settings = {
        "db_host": os.getenv("POSTGRES_HOST"),
        "db_user": os.getenv("POSTGRES_USER"),
        "db_pass": os.getenv("POSTGRES_PASSWORD"),
        "db_name": os.getenv("POSTGRES_DB"),
        "db_port": os.getenv("POSTGRES_PORT"),
    }

    return settings

Ler o Banco SQL

Extrai dados do banco de dados SQL usando a consulta fornecida.

Parameters:

Name Type Description Default
query str

A consulta SQL para extrair dados.

required

Returns:

Type Description
DataFrame

Um DataFrame do Pandas contendo os dados extraídos.

Source code in app\etl.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@pa.check_output(schema=ProdutoSchema, lazy=True) #lazy serve para validar toda a linha
def extrair_do_sql(query: str) -> pd.DataFrame:
    """
    Extrai dados do banco de dados SQL usando a consulta fornecida.

    Args:
        query: A consulta SQL para extrair dados.

    Returns:
        Um DataFrame do Pandas contendo os dados extraídos.
    """
    settings = load_settings()

    # Criar a string de conexão com base nas configurações
    connection_string = f"postgresql://{settings['db_user']}:{settings['db_pass']}@{settings['db_host']}:{settings['db_port']}/{settings['db_name']}"

    # Criar engine de conexão
    engine = create_engine(connection_string)

    with engine.connect() as conn, conn.begin():
            df_crm = pd.read_sql(query, conn)

    return df_crm

Incluir novas Colunas de KPI's

Transforma os dados do DataFrame aplicando cálculos e normalizações.

Parameters:

Name Type Description Default
df DataFrame

DataFrame do Pandas contendo os dados originais.

required

Returns:

Type Description
DataFrame

DataFrame do Pandas após a aplicação das transformações.

Source code in app\etl.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@pa.check_input(ProdutoSchema, lazy=True)  #lazy serve para validar toda a linha
@pa.check_output(ProductSchemaKPI, lazy=True) #lazy serve para validar toda a linha
def transformar(df: pd.DataFrame) -> pd.DataFrame:
    """
    Transforma os dados do DataFrame aplicando cálculos e normalizações.

    Args:
        df: DataFrame do Pandas contendo os dados originais.

    Returns:
        DataFrame do Pandas após a aplicação das transformações.
    """
    # Calcular valor_total_estoque
    df['valor_total_estoque'] = df['quantidade'] * df['preco']

    # Normalizar categoria para maiúsculas
    df['categoria_normalizada'] = df['categoria'].str.lower()

    # Determinar disponibilidade (True se quantidade > 0)
    df['disponibilidade'] = df['quantidade'] > 0

    return df

Salvando tabela no DuckDB

Carrega o DataFrame no DuckDB, criando ou substituindo a tabela especificada.

Parameters:

Name Type Description Default
df DataFrame

DataFrame do Pandas para ser carregado no DuckDB.

required
table_name str

Nome da tabela no DuckDB onde os dados serão inseridos.

required
db_file str

Caminho para o arquivo DuckDB. Se não existir, será criado.

'my_duckdb.db'
Source code in app\etl.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@pa.check_input(ProductSchemaKPI, lazy=True)
def load_to_duckdb(df: pd.DataFrame, table_name: str, db_file: str = 'my_duckdb.db'):
    """
    Carrega o DataFrame no DuckDB, criando ou substituindo a tabela especificada.

    Args:
        df: DataFrame do Pandas para ser carregado no DuckDB.
        table_name: Nome da tabela no DuckDB onde os dados serão inseridos.
        db_file: Caminho para o arquivo DuckDB. Se não existir, será criado.
    """
    # Conectar ao DuckDB. Se o arquivo não existir, ele será criado.
    con = duckdb.connect(database=db_file, read_only=False)

    # Registrar o DataFrame como uma tabela temporária
    con.register('df_temp', df)

    # Utilizar SQL para inserir os dados da tabela temporária em uma tabela permanente
    # Se a tabela já existir, substitui.
    con.execute(f"CREATE OR REPLACE TABLE {table_name} AS SELECT * FROM df_temp")

    # Fechar a conexão
    con.close()