⏱️ Leitura: 25-30 minutos · 🎯 Público: desenvolvedores Python, arquitetos, administradores técnicos da Cortex. Requer familiaridade com Agentes Especializados, Agentes Autônomos e Python ≥ 3.10.
Esta página é a referência para usuários avançados que vão estender a Cortex com código Python — dando aos agentes poderes que eles não têm por padrão (ferramentas) ou mudando como a plataforma trata cada mensagem (funções).
Aqui você aprende a escrever Tools, Pipes, Filters e Actions — com estrutura de código, exemplos funcionais, padrões recomendados, governança e armadilhas comuns.
🚨 AVISO DE SEGURANÇA CRÍTICO
Tools e Functions executam código Python arbitrário no servidor da Cortex. Dar a um usuário permissão para criar/importar tools é equivalente a dar acesso shell ao servidor. Leia a seção de Governança e segurança antes de qualquer coisa. Apenas administradores de confiança devem ter essa permissão.
🚚 COMPATIBILIDADE — Cortex 0.9.0+ (async-first)
A partir da versão 0.9.0, a Cortex migrou toda a camada de dados (ORM/SQLAlchemy 2.0) para assíncrona de ponta a ponta. Isso afeta quem acessa modelos internos da plataforma (
Users,Chats,Files,Tools,Functions,Knowledges, etc.) de dentro de Tools/Functions — toda chamada precisa deawait. As assinaturas dos entrypoints (pipe,inlet,outlet,stream,action, métodos de Tool) não mudaram — apenas o corpo. Se você desenvolveu plugins para versões anteriores, veja o guia 🚚 Migração para Cortex 0.9.0+ ao final desta página antes de publicar. Os exemplos desta página já seguem o padrão async-first compatível com 0.9.0+.
A extensibilidade da Cortex tem seis caminhos distintos. Escolher o certo economiza horas (e dor de cabeça):
| Tipo | Onde roda | Quem vê | Melhor para |
|---|---|---|---|
| 🛠️ Tool | No processo da Cortex (Python) | Modelo invoca durante a conversa | Dar ao agente uma nova habilidade (API, DB, cálculo) |
| 🧩 Pipe Function | No processo da Cortex (Python) | Aparece como modelo novo no seletor | Adicionar provedor de IA exótico ou criar interface não-LLM |
| 🔬 Filter Function | No processo da Cortex (Python) | Invisível — roda antes/depois de cada mensagem | Interceptar input/output (PII, tradução, logs, compliance) |
| 🎯 Action Function | No processo da Cortex (Python) | Botão clicável na mensagem | Ações manuais sobre uma mensagem (exportar, compartilhar) |
| 🌐 MCP Server | Fora da Cortex (HTTP/stdio) | Tools descobertas automaticamente | Conectar servidores MCP padrão (comunidade, parceiros) |
| 📡 OpenAPI Server | Fora da Cortex (HTTP) | Endpoints viram tools | Integrar APIs REST existentes com OpenAPI/Swagger |
┌─────────────────────────────────────────────────────────┐
│ 💬 MENSAGEM DO USUÁRIO │
└───────────────────────┬─────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 🔬 FILTER.inlet() │
│ (modifica a mensagem antes do modelo ver) │
└───────────────────────┬─────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 🤖 MODELO (ou 🧩 PIPE) │
│ O modelo decide chamar uma ferramenta? │
│ ├── ✅ SIM → 🛠️ TOOL executa → resultado volta │
│ └── ❌ NÃO → continua geração │
└───────────────────────┬─────────────────────────────────┘
│
▼ (streaming)
┌─────────────────────────────────────────────────────────┐
│ 🔬 FILTER.stream() │
│ (intercepta cada chunk de streaming) │
└───────────────────────┬─────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 🔬 FILTER.outlet() │
│ (processa a resposta completa) │
└───────────────────────┬─────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 💬 RESPOSTA NA TELA │
│ [🎯 Actions aparecem como botões opcionais] │
└─────────────────────────────────────────────────────────┘
| Cenário | Solução recomendada | Por quê |
|---|---|---|
| "Quero que o agente consulte o CRM durante a conversa" | 🛠️ Tool | Modelo decide chamar quando fizer sentido |
| "Preciso adicionar a API interna Y como se fosse um modelo" | 🧩 Pipe Function | Aparece no seletor de modelos como 'Y' |
| "Quero mascarar CPF em todas as mensagens de todos os agentes" | 🔬 Filter Function (inlet) global | Atua transversalmente, sem modificar agentes |
| "Preciso fazer log no Splunk de cada resposta" | 🔬 Filter Function (outlet) global | Captura após a resposta, pronta |
| "Quero um botão 'Exportar para PDF' em cada resposta" | 🎯 Action Function | UI interativa, acionada pelo usuário |
| "Tenho um servidor MCP padrão rodando na infra" | 🌐 MCP Server | Conectar sem escrever código |
| "Tenho uma API REST com OpenAPI 3.0" | 📡 OpenAPI Server | Tools geradas automaticamente |
| "Preciso de GPU para o processamento" | 🔧 Pipeline (container separado) | Off-load da instância principal |
Uma Tool é um arquivo Python com:
ToolsValves e UserValves para configuração"""
title: Minha Primeira Tool
author: Seu Nome
author_url: https://sua.empresa
version: 1.0.0
description: O que esta tool faz, em uma frase
required_open_webui_version: 0.9.0
requirements: httpx
"""
from pydantic import BaseModel, Field
class Tools:
def __init__(self):
"""Inicializa a Tool."""
self.valves = self.Valves()
class Valves(BaseModel):
api_key: str = Field("", description="Chave da API externa")
async def minha_acao(self, parametro: str) -> str:
"""
Descrição clara do que esta ação faz.
O modelo lê ESTA docstring para decidir quando te chamar.
:param parametro: descrição do parâmetro
:return: resultado em texto
"""
return f"Processei: {parametro}"
💡 Nota v0.9.0+: declare
required_open_webui_version: 0.9.0quando sua Tool depender de comportamentos async da plataforma (acesso a modelos internos, sessão de DB async, etc.). A Cortex recusa carregar plugins cujo requisito mínimo seja maior que a versão instalada, protegendo você de rodar código incompatível.
Tools (exatamente). A Cortex detecta por isso.async def — na Cortex 0.9.0+, o runtime é async-first de ponta a ponta. Tools que acessam modelos internos (Users, Chats, Files, Knowledges, etc.) precisam ser async def e usar await em toda chamada de modelo. Tools que só fazem cálculo local ou chamam APIs externas podem ser síncronas, mas async ainda é o padrão recomendado — e libs síncronas legadas devem ser envolvidas com anyio.to_thread.run_sync(...) para não bloquear o event loop. Veja Migração para Cortex 0.9.0+.httpx.AsyncClient, aiofiles, asyncpg, aiomysql, redis.asyncio. Evite requests, urllib, psycopg2 síncrono dentro de métodos async — eles bloqueiam o loop e degradam a plataforma inteira.Type hints podem ser complexos — Pydantic gera o schema:
from typing import Optional
async def buscar(
self,
query: str, # string obrigatória
limite: int = 10, # int com default
filtros: Optional[list[str]] = None, # lista opcional
ordenar: list[tuple[str, int]] = None, # aninhado
) -> dict:
"""..."""
Argumentos com prefixo __ são injetados automaticamente pela Cortex — você não precisa pensar neles como parâmetros do modelo. Use-os quando precisar:
| Argumento | O que contém |
|---|---|
__user__ |
Dict com {id, email, name, role, valves} — inclui UserValves |
__event_emitter__ |
Função async para emitir eventos (status, mensagens, citações) |
__event_call__ |
Como emitter mas para interação com usuário (input, confirmação) |
__metadata__ |
Metadados da conversa (agente, modelo, parâmetros) |
__messages__ |
Histórico da conversa até o momento |
__files__ |
Arquivos anexados pelo usuário |
__model__ |
Info do modelo em uso |
__oauth_token__ |
Token OAuth do usuário (para chamar APIs em nome dele) |
__request__ |
Objeto da requisição HTTP original |
Exemplo:
async def acao_autenticada(
self,
parametro: str,
__user__: dict = None, # injetado
__event_emitter__ = None, # injetado
__oauth_token__: dict = None, # injetado
) -> str:
"""Ação que usa contexto do usuário e emite status."""
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Processando...", "done": False}
})
user_email = __user__["email"] if __user__ else "anonimo"
# ... lógica ...
return f"OK, {user_email}"
Valves são o mecanismo de configuração da Tool/Function. Dois níveis:
| Nível | Classe | Quem configura | Onde |
|---|---|---|---|
| Admin | Valves |
Apenas administradores | Espaço de Trabalho → Ferramentas → ⚙️ |
| Usuário | UserValves |
Cada usuário pra si | Interface de chat ao usar a tool |
"""
title: Consultor de Clima Corporativo
author: SinapseTech
version: 1.0.0
description: Consulta previsão do tempo via API configurada
requirements: httpx
"""
import httpx
from pydantic import BaseModel, Field
class Tools:
def __init__(self):
self.valves = self.Valves()
# Configuração administrativa (global, mesma para todos)
class Valves(BaseModel):
api_base_url: str = Field(
default="https://api.openweathermap.org/data/2.5",
description="URL base da API meteorológica"
)
api_key: str = Field(
default="",
description="Chave da API (obrigatório)"
)
timeout: int = Field(
default=10,
description="Timeout em segundos"
)
# Configuração por usuário (opcional, cada um a sua)
class UserValves(BaseModel):
unidade: str = Field(
default="metric",
description="metric (Celsius) ou imperial (Fahrenheit)"
)
idioma: str = Field(
default="pt_br",
description="Idioma da descrição"
)
async def consultar_clima(
self,
cidade: str,
__user__: dict = None,
) -> str:
"""
Consulta o clima atual de uma cidade.
:param cidade: nome da cidade (ex.: 'São Paulo')
:return: descrição do clima em texto
"""
if not self.valves.api_key:
return "Erro: API key não configurada pelo administrador"
# UserValves é acessado via __user__
user_valves = (
__user__.get("valves") if __user__ else self.UserValves()
)
async with httpx.AsyncClient(timeout=self.valves.timeout) as client:
r = await client.get(
f"{self.valves.api_base_url}/weather",
params={
"q": cidade,
"appid": self.valves.api_key,
"units": user_valves.unidade,
"lang": user_valves.idioma,
}
)
if r.status_code != 200:
return f"Erro na consulta: HTTP {r.status_code}"
data = r.json()
temp = data["main"]["temp"]
desc = data["weather"][0]["description"]
umidade = data["main"]["humidity"]
unidade_txt = "°C" if user_valves.unidade == "metric" else "°F"
return (
f"Clima em {cidade}: {desc}, "
f"{temp}{unidade_txt}, umidade {umidade}%"
)
Field(default=..., description="...") sempre — a descrição aparece na UIEmailStr, HttpUrl, conint, etc.)UserValves (qualquer usuário vê)Tools que demoram ou fazem múltiplas etapas devem avisar o usuário do progresso. O __event_emitter__ faz isso.
✅ Compatibilidade v0.9.0+: os argumentos
__event_emitter__e__event_call__não mudaram na migração async. Você continua recebendo os callables já construídos pela plataforma e usandoawait __event_emitter__({...})exatamente como antes. A mudança aconteceu no construtor interno (que agora é async) — mas é invisível para quem escreve plugins.
| Tipo | O que faz | Compatível em Native Mode? |
|---|---|---|
status |
Indicador de progresso acima da resposta | ✅ Sim |
notification |
Toast notification | ✅ Sim |
citation |
Fontes citadas | ✅ Sim |
chat:title |
Atualiza título da conversa | ✅ Sim |
chat:tags |
Atualiza tags | ✅ Sim |
message / replace |
Acrescenta/substitui conteúdo da mensagem | ⚠️ Só Default Mode |
confirmation |
Pede confirmação do usuário | ✅ Sim |
input |
Pede input do usuário | ✅ Sim |
⚠️ Importante: eventos de conteúdo (
message,replace) não funcionam bem em Native Mode — eles são sobrescritos pelo streaming do modelo. Use sóstatus,notification,citationse quiser compatibilidade com ambos os modos. Veja Modos de function calling.
async def processar(self, arquivo: str, __event_emitter__=None) -> str:
"""Processa um arquivo com feedback visual."""
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Carregando arquivo...", "done": False}
})
# ... carga real ...
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Analisando 10.000 linhas...", "done": False}
})
# ... análise real ...
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Pronto!", "done": True, "hidden": False}
})
return "Resultado da análise: ..."
Quando sua tool traz dados de fontes, cite-as para transparência:
from datetime import datetime
async def buscar_base_interna(
self,
consulta: str,
__event_emitter__=None,
) -> str:
"""Busca em base interna e cita as fontes."""
resultados = self._buscar(consulta) # sua lógica
if __event_emitter__:
for r in resultados:
await __event_emitter__({
"type": "citation",
"data": {
"document": [r["trecho"]],
"metadata": [{
"date_accessed": datetime.now().isoformat(),
"source": r["titulo"],
"url": r["link"],
}],
"source": {
"name": r["titulo"],
"url": r["link"],
}
}
})
return "\n\n".join(r["trecho"] for r in resultados)
💡 Para citações manuais funcionarem, defina
self.citation = Falseem__init__— senão a Cortex injeta citações automáticas que sobrescrevem as suas.
Dois modos controlam como o modelo decide chamar tools:
A Cortex injeta no prompt um template que orienta o modelo a retornar JSON com a tool a ser chamada.
message/replace events de toolsO modelo usa function calling nativo (estruturas JSON na API), sem injeção de prompt.
gpt-5, claude-opus-4-7, claude-sonnet-4-6, gemini-3-pro, grok-4.20-reasoning, o3, o4-mini| Situação | Modo |
|---|---|
| Tool simples, retorna um valor | Native |
| Tool com múltiplas etapas e raciocínio | Native |
Precisa atualizar a mensagem em tempo real (message/replace) |
Default |
| Modelo pequeno/antigo sem function calling | Default |
| Precisa das built-in system tools (memória, notas, RAG) | Native |
⚠️ Quando usar Native Mode em um agente com base de conhecimento: o RAG não é injetado automaticamente. O modelo precisa chamar as built-in tools de conhecimento. Oriente isso no prompt do sistema ("use
query_knowledge_filesquando precisar de documentação interna"). Veja RAG agêntico.
Sete exemplos prontos para copiar, adaptar e usar.
"""
title: String Inverter
author: Seu Nome
version: 1.0.0
description: Inverte strings e calcula comprimento
"""
class Tools:
def __init__(self):
pass
async def inverter_string(self, texto: str) -> str:
"""
Inverte a string fornecida.
:param texto: string a ser invertida
:return: string invertida
"""
return texto[::-1]
async def contar_caracteres(self, texto: str) -> dict:
"""
Conta caracteres, palavras e linhas.
:param texto: texto para contar
:return: estatísticas como dict
"""
return {
"caracteres": len(texto),
"caracteres_sem_espaco": len(texto.replace(" ", "")),
"palavras": len(texto.split()),
"linhas": len(texto.splitlines()) or 1,
}
Padrão comum: chamar uma API corporativa (CRM, ERP, etc.).
"""
title: Consulta CRM - Clientes
author: SinapseTech
version: 1.0.0
description: Consulta dados de clientes no CRM corporativo
requirements: httpx
"""
import httpx
from pydantic import BaseModel, Field
class Tools:
def __init__(self):
self.valves = self.Valves()
class Valves(BaseModel):
crm_base_url: str = Field(
default="",
description="URL base da API do CRM (ex.: https://crm.empresa.com/api/v1)"
)
api_token: str = Field(
default="",
description="Token de autenticação do CRM"
)
timeout: int = Field(default=15, description="Timeout em segundos")
async def buscar_cliente_por_cnpj(
self,
cnpj: str,
__event_emitter__=None,
) -> dict:
"""
Busca cliente no CRM pelo CNPJ.
:param cnpj: CNPJ (com ou sem formatação)
:return: dict com dados do cliente ou erro
"""
cnpj_limpo = "".join(c for c in cnpj if c.isdigit())
if len(cnpj_limpo) != 14:
return {"erro": "CNPJ inválido (deve ter 14 dígitos)"}
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Consultando CNPJ {cnpj}...", "done": False}
})
headers = {"Authorization": f"Bearer {self.valves.api_token}"}
try:
async with httpx.AsyncClient(timeout=self.valves.timeout) as client:
r = await client.get(
f"{self.valves.crm_base_url}/clientes",
params={"cnpj": cnpj_limpo},
headers=headers,
)
r.raise_for_status()
dados = r.json()
except httpx.HTTPStatusError as e:
return {"erro": f"HTTP {e.response.status_code}"}
except Exception as e:
return {"erro": f"Falha na consulta: {e}"}
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Cliente encontrado", "done": True}
})
return {
"razao_social": dados.get("razao_social"),
"fantasia": dados.get("nome_fantasia"),
"status": dados.get("status"),
"receita_mensal": dados.get("mrr"),
"responsavel_cs": dados.get("cs_owner"),
"ultimo_contato": dados.get("last_touch"),
}
async def listar_oportunidades_abertas(
self,
cnpj: str,
__event_emitter__=None,
) -> list[dict]:
"""
Lista oportunidades comerciais em aberto para um cliente.
:param cnpj: CNPJ do cliente
:return: lista de oportunidades
"""
cnpj_limpo = "".join(c for c in cnpj if c.isdigit())
headers = {"Authorization": f"Bearer {self.valves.api_token}"}
async with httpx.AsyncClient(timeout=self.valves.timeout) as client:
r = await client.get(
f"{self.valves.crm_base_url}/opportunities",
params={"cnpj": cnpj_limpo, "status": "open"},
headers=headers,
)
if r.status_code != 200:
return []
return r.json().get("items", [])
Como o modelo usa isso:
buscar_cliente_por_cnpj("12.345.678/0001-90") → recebe dados → responde.listar_oportunidades_abertas(cnpj=...) → lista para o usuário.Padrão crítico para casos onde a API externa exige identidade do usuário (nunca use um token compartilhado). A Cortex injeta __oauth_token__ automaticamente quando configurado.
"""
title: Meu Microsoft 365
author: SinapseTech
version: 1.0.0
description: Acessa recursos do Microsoft 365 no contexto do usuário logado
requirements: httpx
"""
import httpx
from typing import Optional
class Tools:
def __init__(self):
pass
async def listar_meus_emails(
self,
limite: int = 10,
__oauth_token__: Optional[dict] = None,
) -> dict:
"""
Lista os últimos e-mails do usuário logado (Microsoft Graph).
:param limite: quantos e-mails retornar (máx 50)
:return: dict com mensagens ou erro
"""
if not __oauth_token__ or "access_token" not in __oauth_token__:
return {
"erro": "Você não está autenticado via SSO Microsoft ou o "
"token não está disponível para esta tool."
}
limite = max(1, min(limite, 50))
access_token = __oauth_token__["access_token"]
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
async with httpx.AsyncClient(timeout=15) as client:
r = await client.get(
"https://graph.microsoft.com/v1.0/me/messages",
params={
"$top": limite,
"$select": "subject,from,receivedDateTime,bodyPreview",
"$orderby": "receivedDateTime desc",
},
headers=headers,
)
if r.status_code != 200:
return {"erro": f"Microsoft Graph retornou {r.status_code}"}
mensagens = r.json().get("value", [])
return {
"total": len(mensagens),
"mensagens": [
{
"assunto": m.get("subject"),
"de": m.get("from", {}).get("emailAddress", {}).get("address"),
"recebido_em": m.get("receivedDateTime"),
"previa": m.get("bodyPreview"),
}
for m in mensagens
],
}
Vantagens do OAuth por usuário:
Padrão muito comum: o modelo faz uma pergunta de negócio, a tool traduz para SQL seguro e retorna.
"""
title: Consulta Vendas
author: SinapseTech
version: 1.0.0
description: Consulta agregações de vendas no Data Warehouse
requirements: asyncpg, pydantic
"""
import asyncpg
from pydantic import BaseModel, Field
from datetime import date, timedelta
class Tools:
def __init__(self):
self.valves = self.Valves()
class Valves(BaseModel):
dsn: str = Field(
default="",
description="DSN de conexão ao DW (postgresql://user:pass@host/db)"
)
schema_: str = Field(
default="analytics",
description="Schema com as tabelas de vendas",
alias="schema",
)
async def _conn(self):
return await asyncpg.connect(self.valves.dsn)
async def vendas_por_periodo(
self,
dias: int = 30,
agrupar_por: str = "produto",
) -> list[dict]:
"""
Retorna vendas dos últimos N dias, agrupadas por dimensão.
:param dias: quantos dias para trás (1 a 365)
:param agrupar_por: 'produto', 'regiao', 'vendedor' ou 'canal'
:return: lista de agregações ordenadas por valor desc
"""
dias = max(1, min(dias, 365))
dimensoes_validas = {"produto", "regiao", "vendedor", "canal"}
if agrupar_por not in dimensoes_validas:
return [{"erro": f"agrupar_por deve ser um de {dimensoes_validas}"}]
data_ini = date.today() - timedelta(days=dias)
# Importante: coluna de agrupamento é validada contra allowlist
# (não concatena string do usuário em SQL)
sql = f"""
SELECT
{agrupar_por} AS dimensao,
SUM(valor_total) AS total_vendas,
COUNT(DISTINCT pedido_id) AS num_pedidos,
AVG(valor_total) AS ticket_medio
FROM {self.valves.schema_}.vendas
WHERE data_pedido >= $1
GROUP BY {agrupar_por}
ORDER BY total_vendas DESC
LIMIT 20
"""
conn = await self._conn()
try:
rows = await conn.fetch(sql, data_ini)
finally:
await conn.close()
return [
{
"dimensao": r["dimensao"],
"total_vendas": float(r["total_vendas"] or 0),
"num_pedidos": r["num_pedidos"],
"ticket_medio": float(r["ticket_medio"] or 0),
}
for r in rows
]
🔒 Segurança: nunca concatene parâmetros do modelo diretamente em SQL. Use queries parametrizadas (
$1,$2) e allowlists para nomes de coluna/tabela (validação antes de usar).
Para ações de alto impacto (write/delete), peça confirmação explícita:
"""
title: Gestão de Tickets
author: SinapseTech
version: 1.0.0
description: Criação e atualização de tickets no ITSM
requirements: httpx
"""
import httpx
from pydantic import BaseModel, Field
class Tools:
def __init__(self):
self.valves = self.Valves()
class Valves(BaseModel):
itsm_url: str = Field(default="", description="URL do ITSM")
api_token: str = Field(default="", description="Token")
async def fechar_ticket(
self,
ticket_id: str,
resolucao: str,
__event_call__=None,
__event_emitter__=None,
) -> str:
"""
Fecha um ticket no ITSM com a resolução informada.
PEDE CONFIRMAÇÃO DO USUÁRIO antes de executar.
:param ticket_id: ID do ticket (ex.: INC-12345)
:param resolucao: texto de resolução
"""
if __event_call__ is None:
return "Este comando requer interação. Ative function calling default."
# Pede confirmação antes de qualquer write
resposta = await __event_call__({
"type": "confirmation",
"data": {
"message": (
f"Tem certeza que deseja FECHAR o ticket {ticket_id}?\n\n"
f"Resolução:\n{resolucao}"
)
}
})
if not resposta:
return "❌ Operação cancelada pelo usuário."
# Só agora executa a ação
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Fechando {ticket_id}...", "done": False}
})
headers = {"Authorization": f"Bearer {self.valves.api_token}"}
async with httpx.AsyncClient(timeout=10) as client:
r = await client.patch(
f"{self.valves.itsm_url}/tickets/{ticket_id}",
json={"status": "closed", "resolution": resolucao},
headers=headers,
)
if r.status_code not in (200, 204):
return f"❌ Falha ao fechar ticket (HTTP {r.status_code})"
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Ticket fechado", "done": True}
})
return f"✅ Ticket {ticket_id} fechado com sucesso."
Quando faz sentido ter várias ações correlatas na mesma tool:
"""
title: Gestão de Colaboradores (RH)
author: SinapseTech
version: 1.0.0
description: Consultas ao sistema de RH da empresa
requirements: httpx
"""
import httpx
from typing import Optional
from pydantic import BaseModel, Field
class Tools:
def __init__(self):
self.valves = self.Valves()
class Valves(BaseModel):
hris_url: str = Field(default="", description="URL do HRIS")
api_token: str = Field(default="")
async def _get(self, path: str, params: Optional[dict] = None):
headers = {"Authorization": f"Bearer {self.valves.api_token}"}
async with httpx.AsyncClient(timeout=10) as client:
r = await client.get(
f"{self.valves.hris_url}{path}",
params=params,
headers=headers,
)
r.raise_for_status()
return r.json()
async def buscar_colaborador(self, nome_ou_email: str) -> dict:
"""
Busca colaborador por nome parcial ou e-mail.
:param nome_ou_email: parte do nome ou e-mail completo
:return: dados do colaborador
"""
res = await self._get("/employees/search", {"q": nome_ou_email})
if not res.get("items"):
return {"erro": "Nenhum colaborador encontrado"}
return res["items"][0] # primeiro match
async def consultar_saldo_ferias(self, email: str) -> dict:
"""
Consulta saldo de férias do colaborador.
:param email: e-mail corporativo
:return: dict com dias disponíveis e vencimentos
"""
return await self._get(f"/employees/{email}/vacation-balance")
async def listar_feriados_do_ano(self, ano: int) -> list[dict]:
"""
Lista feriados nacionais e municipais do ano.
:param ano: ano de 4 dígitos (ex.: 2026)
:return: lista de feriados
"""
if ano < 2020 or ano > 2030:
return [{"erro": "Ano fora do intervalo suportado (2020-2030)"}]
res = await self._get("/holidays", {"year": ano})
return res.get("holidays", [])
async def organograma_equipe(self, gestor_email: str) -> dict:
"""
Retorna a hierarquia direta e indireta de um gestor.
:param gestor_email: e-mail do gestor
:return: árvore com subordinados
"""
return await self._get(f"/employees/{gestor_email}/tree")
Quando a tool retorna dataset estruturado que o modelo vai processar com code interpreter:
"""
title: Extrator de Métricas
author: SinapseTech
version: 1.0.0
description: Extrai métricas agregadas do sistema de monitoramento
requirements: httpx
"""
import httpx
from datetime import datetime, timedelta
from pydantic import BaseModel, Field
class Tools:
def __init__(self):
self.valves = self.Valves()
class Valves(BaseModel):
prometheus_url: str = Field(default="http://prometheus:9090")
async def consultar_metrica(
self,
metrica: str,
horas_atras: int = 24,
intervalo_min: int = 5,
) -> dict:
"""
Consulta uma métrica do Prometheus no intervalo especificado.
Retorna a série temporal completa em formato JSON pronto
para análise com código Python (pandas, matplotlib).
:param metrica: nome da métrica (ex.: 'http_requests_total')
:param horas_atras: janela temporal para trás (padrão 24h)
:param intervalo_min: intervalo entre pontos em minutos
:return: dict com timestamps e valores
"""
fim = datetime.utcnow()
inicio = fim - timedelta(hours=horas_atras)
async with httpx.AsyncClient(timeout=30) as client:
r = await client.get(
f"{self.valves.prometheus_url}/api/v1/query_range",
params={
"query": metrica,
"start": inicio.isoformat() + "Z",
"end": fim.isoformat() + "Z",
"step": f"{intervalo_min}m",
},
)
if r.status_code != 200:
return {"erro": f"Prometheus: HTTP {r.status_code}"}
payload = r.json()
if payload.get("status") != "success":
return {"erro": "Query inválida", "detalhe": payload}
series = payload["data"]["result"]
return {
"metrica": metrica,
"inicio": inicio.isoformat() + "Z",
"fim": fim.isoformat() + "Z",
"passo_minutos": intervalo_min,
"series": [
{
"labels": s["metric"],
"pontos": [
{"ts": float(p[0]), "valor": float(p[1])}
for p in s["values"]
]
}
for s in series
],
}
Fluxo esperado:
consultar_metrica("rate(http_errors_5xx[5m])", horas_atras=6)Functions são extensões da plataforma, não do modelo. Três tipos detectados automaticamente pelo nome da classe.
Registra-se como um "modelo" no seletor. Quando o usuário escolhe, o método pipe() lida com a requisição inteira — pode nem haver LLM por trás.
Casos de uso:
"""
title: Assistente Consultor Financeiro
author: SinapseTech
version: 1.0.0
description: Pipe que combina análise de mercado + RAG interno
requirements: openai, httpx
"""
from pydantic import BaseModel, Field
class Pipe:
class Valves(BaseModel):
openai_key: str = Field(default="")
market_api_url: str = Field(default="https://market.api.com")
def __init__(self):
self.type = "pipe"
self.id = "consultor-financeiro"
self.name = "Consultor Financeiro (Cortex)"
self.valves = self.Valves()
async def pipe(
self,
body: dict,
__user__: dict = None,
__event_emitter__ = None,
):
"""
Lida com a requisição do usuário.
body contém: messages, model, temperature, etc.
"""
mensagens = body.get("messages", [])
ultima = mensagens[-1]["content"] if mensagens else ""
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Consultando mercado...", "done": False}
})
# 1. Busca dados de mercado (chamada externa)
dados_mercado = await self._fetch_market(ultima)
# 2. Combina com RAG interno (se houver)
# ... lógica ...
# 3. Chama LLM com contexto enriquecido
# ... lógica com OpenAI/Anthropic ...
resposta = f"Análise baseada em: {dados_mercado}\n\n[... análise ...]"
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Concluído", "done": True}
})
return resposta
async def _fetch_market(self, consulta: str) -> dict:
# ... implementação ...
return {}
Para expor múltiplos modelos a partir do mesmo pipe (manifold), implemente pipes():
class Pipe:
# ... __init__, Valves ...
def pipes(self):
return [
{"id": "consultor-renda-fixa", "name": "Consultor Renda Fixa"},
{"id": "consultor-variavel", "name": "Consultor Renda Variável"},
{"id": "consultor-cripto", "name": "Consultor Cripto"},
]
async def pipe(self, body, __user__=None):
model_id = body.get("model") # qual foi selecionado
# ... roteia por model_id ...
Filter intercepta todas as mensagens em 3 pontos:
inlet(body) — antes do modelo receberstream(chunk) — durante o streamingoutlet(body) — depois do modelo responderCasos de uso: PII, compliance, logs, rate limit, reescrita, tradução.
"""
title: Filtro LGPD Corporativo
author: SinapseTech
version: 1.0.0
description: Mascara PII em entrada e saída
"""
import re
from pydantic import BaseModel, Field
class Filter:
class Valves(BaseModel):
mascarar_cpf: bool = Field(default=True)
mascarar_email: bool = Field(default=True)
mascarar_telefone: bool = Field(default=True)
log_bloqueios: bool = Field(default=True)
def __init__(self):
self.valves = self.Valves()
self.toggle = True # usuário pode desligar por chat
def _mascarar(self, texto: str) -> tuple[str, list[str]]:
"""Retorna (texto_mascarado, lista_de_achados)."""
achados = []
if self.valves.mascarar_cpf:
padrao_cpf = r"\b\d{3}\.?\d{3}\.?\d{3}-?\d{2}\b"
if re.search(padrao_cpf, texto):
achados.append("CPF")
texto = re.sub(padrao_cpf, "[CPF MASCARADO]", texto)
if self.valves.mascarar_email:
padrao_email = r"\b[\w.+-]+@[\w-]+\.[\w.-]+\b"
if re.search(padrao_email, texto):
achados.append("EMAIL")
texto = re.sub(padrao_email, "[EMAIL MASCARADO]", texto)
if self.valves.mascarar_telefone:
padrao_tel = r"\b\(?\d{2}\)?\s*9?\s*\d{4}-?\d{4}\b"
if re.search(padrao_tel, texto):
achados.append("TELEFONE")
texto = re.sub(padrao_tel, "[TEL MASCARADO]", texto)
return texto, achados
async def inlet(self, body: dict, __user__: dict = None) -> dict:
"""Chamado antes do modelo receber a mensagem."""
mensagens = body.get("messages", [])
for msg in mensagens:
if msg.get("role") == "user" and isinstance(msg.get("content"), str):
texto_limpo, achados = self._mascarar(msg["content"])
msg["content"] = texto_limpo
if achados and self.valves.log_bloqueios:
# log aqui: enviar para SIEM, salvar, etc.
print(f"[LGPD] Usuário {__user__['id']}: {achados}")
return body
async def outlet(self, body: dict, __user__: dict = None) -> dict:
"""Chamado depois do modelo responder."""
mensagens = body.get("messages", [])
for msg in mensagens:
if msg.get("role") == "assistant" and isinstance(msg.get("content"), str):
texto_limpo, _ = self._mascarar(msg["content"])
msg["content"] = texto_limpo
return body
Filters podem ser:
self.toggle = True) — o usuário pode ligar/desligar por chatAdiciona um botão clicável abaixo de cada resposta. Quando clicado, executa Python com acesso total à conversa e ao event system.
"""
title: Exportar para PDF
author: SinapseTech
version: 1.0.0
description: Converte a resposta em PDF corporativo
requirements: reportlab
"""
from pydantic import BaseModel, Field
from io import BytesIO
class Action:
class Valves(BaseModel):
template: str = Field(default="padrao", description="padrao | executivo")
logo_url: str = Field(default="", description="URL do logo")
def __init__(self):
self.valves = self.Valves()
async def action(
self,
body: dict,
__user__: dict = None,
__event_emitter__=None,
) -> dict:
"""
Executado quando o usuário clica no botão 'Exportar para PDF'.
`body` contém a mensagem atual.
"""
conteudo = body.get("content", "")
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Gerando PDF...", "done": False}
})
# Gerar PDF (exemplo simplificado)
from reportlab.pdfgen import canvas
buf = BytesIO()
c = canvas.Canvas(buf)
# ... renderizar conteúdo ...
c.save()
# Salvar e disponibilizar
pdf_bytes = buf.getvalue()
# ... upload para storage, gerar URL ...
url_publica = "https://..."
if __event_emitter__:
await __event_emitter__({
"type": "notification",
"data": {"content": f"PDF pronto: {url_publica}"}
})
await __event_emitter__({
"type": "status",
"data": {"description": "PDF gerado", "done": True}
})
return {"status": "success", "url": url_publica}
Actions são raras — só use quando o caso de uso é genuinamente acionado pelo usuário, não pelo modelo.
Toda Tool/Function deve ter frontmatter:
"""
title: Nome Amigável
author: Seu Nome / Empresa
author_url: https://sua.url
version: 1.0.0
icon_url: https://url/icone.svg # Aparece ao lado do nome
description: O que faz, em uma frase
required_open_webui_version: 0.9.0
requirements: httpx, pydantic>=2, aiofiles
license: MIT
"""
💡 Dependências automáticas: se você listar
requirements: httpx, aiofiles, a Cortex instala viapipautomaticamente na primeira carga.📌
required_open_webui_version: use0.9.0se seu código depende do runtime async novo (acesso a modelos internos,AsyncSession, etc.). Para plugins simples que só chamam APIs externas, versões menores ainda funcionam — mas recomendamos0.9.0como baseline para garantir comportamento async-first da plataforma.
Padrão aberto da Anthropic para conectar LLMs a fontes externas. A Cortex suporta MCP nativo via HTTP/SSE — basta apontar para o servidor MCP.
Útil para reutilizar servidores da comunidade (filesystem, git, databases) ou de parceiros sem reescrever em Python.
Se você já tem uma API REST documentada com OpenAPI/Swagger, aponte a Cortex para o .json ou .yaml e cada endpoint vira uma tool automaticamente. Zero código de integração.
Configuração: Configurações → Conexões → OpenAPI Server → URL do schema.
Ideal para:
Separadas em container próprio. Usar quando a Tool/Function:
A Cortex fala com o Pipeline via HTTP padrão, mantendo a instância principal leve.
Trade-off: infraestrutura adicional para manter. Só use quando houver justificativa clara — para a maioria dos casos, Tools/Functions resolvem.
async def; libs de I/O são async-native (httpx, aiofiles, asyncpg, aiomysql, redis.asyncio); libs síncronas legadas só via anyio.to_thread.run_syncEsta seção é obrigatória em qualquer implementação da Cortex que habilite tools/functions corporativas.
Todo Tool/Function da comunidade ou de terceiros deve ser revisado antes de importar:
requirements)httpx, requests, urllib)open, os.system, subprocess)eval, exec, pickle, marshal)O diretório de dados da Cortex (database, configurações, tools em cache) é crítico. Se um atacante tem acesso de escrita, pode injetar tools maliciosas que executam com privilégios da plataforma. Trate como tesouro.
A cada trimestre:
__oauth_token__) em vez de service account globalCada invocação de tool/function gera log estruturado:
Esses logs podem ser exportados para SIEM — veja APIs e Integrações e Segurança e Privacidade.
| Antipattern | Por que é ruim | Correção |
|---|---|---|
| Hardcode de API key | Qualquer admin com acesso ao editor vê | Use Valves com Field(default="") |
Execução de string do modelo (eval, exec) |
Execução arbitrária | Valide contra allowlist |
| SQL concatenado | SQL injection | Queries parametrizadas |
| Sem timeout em chamadas externas | Tool trava, modelo trava | httpx.AsyncClient(timeout=...) |
| Retornar traceback para o usuário | Vaza info interna | Capture exceções, retorne mensagem amigável |
| Sem validação de parâmetros | Modelo pode passar valores malucos | min/max, enum, regex no Pydantic |
| Tool com acesso total ao filesystem | Ransomware / exfiltração | Whitelist de diretórios |
| Credenciais OAuth compartilhadas | Sem accountability individual | Use __oauth_token__ |
requests/urllib sync dentro de async def (v0.9.0+) |
Bloqueia o event loop, degrada a plataforma toda | httpx.AsyncClient ou anyio.to_thread.run_sync |
Esquecer await em modelo interno (v0.9.0+) |
Recebe coroutine ao invés do dado, respostas quebradas |
await Users.get_user_by_id(...) — sempre |
get_db_context ou SessionLocal em runtime (v0.9.0+) |
Deadlock contra pool async sob carga | get_async_db_context + async with |
db.query(Model).first() em runtime (v0.9.0+) |
API legada removida, tipo não bate com AsyncSession |
await db.execute(select(Model)) + .scalars().first() |
Segurança funcional:
required_open_webui_version)description clarasrequirements) pinadas em versões testadasCompatibilidade com Cortex 0.9.0+ (async-first):
async defUsers.*, Chats.*, Files.*, Models.*, Functions.*, Tools.*, Knowledges.* tem await na frenteget_db_context, get_db, get_session, SessionLocal em código de runtimeselect(...) + await db.execute(...) (SQLAlchemy 2.0 async)requests, psycopg2, etc.) chamada diretamente dentro de async def sem anyio.to_thread.run_syncRuntimeWarning: coroutine ... was never awaited nos logs)👉 Integre com Governança de IA e IA Responsável para processo formal de aprovação.
A versão 0.9.0 da Cortex trouxe o maior refactor interno da plataforma desde o seu início: toda a camada de acesso a dados agora é assíncrona. Esta seção é o guia definitivo para atualizar Tools, Functions, Pipes, Filters e Actions escritos para versões anteriores (0.4.x, 0.5.x, 0.8.x).
Antes da 0.9.0, os handlers HTTP do FastAPI já eram async, mas a camada de banco usava SQLAlchemy síncrono. Isso forçava wrappers em run_in_threadpool por todo o código e, sob carga, saturava o thread pool — cada chat travava chamadas de DB bloqueantes, estrangulando a concorrência.
A 0.9.0 resolve isso migrando tudo para assíncrono:
| Área | Antes (≤ 0.8.x) | A partir da 0.9.0 |
|---|---|---|
| ORM | SQLAlchemy 1.x síncrono (db.query(...)) |
SQLAlchemy 2.0 async (await db.execute(select(...))) |
| Sessão de DB | Session / get_db_context() |
AsyncSession / get_async_db_context() |
| Métodos de modelo | Users.get_user_by_id(id) (sync) |
await Users.get_user_by_id(id) (coroutine) |
| Helpers utilitários | open_webui.utils.* em grande parte síncronos |
Promovidos a async def quando tocam DB |
| Driver SQLite | sqlite:// direto |
sqlite+aiosqlite:// (reescrito automaticamente) |
| Driver PostgreSQL | psycopg2 |
asyncpg (URLs reescritas automaticamente) |
| SQLCipher | Suportado | Não suportado mais — use SQLite/Postgres padrão |
__event_emitter__ / __event_call__ |
await emitter(...) |
await emitter(...) (inalterado) |
Assinaturas de pipe / inlet / outlet / action / Tools |
async def (desde 0.5) |
async def (inalterado) |
Benefícios diretos para quem escreve plugins:
run_in_threadpool por toda parte.Custo: todo código que encostava no banco precisa ser revisado.
Se sua Tool/Function não acessa modelos internos da Cortex (só chama APIs externas, faz cálculo, processa strings), provavelmente nada quebra. Você pode mesmo assim atualizar required_open_webui_version para 0.9.0 e seguir adiante.
Se sua Tool/Function importa de open_webui.models.* ou chama helpers de open_webui.utils.* que tocam banco, você precisa fazer uma varredura mecânica e adicionar await:
Users.get_user_by_id(...), Chats.get_chat_by_id(...), Files.get_file_by_id(...), Models.get_model_by_id(...), Functions.get_function_by_id(...), Tools.get_tool_by_id(...), Knowledges.get_knowledge_by_id(...) — sem await, você recebe um objeto coroutine, não o dado.open_webui.utils.* ou open_webui.retrieval.* e ele tocava DB, agora precisa de await.AsyncSession. get_db_context saiu do ar para código de runtime — use get_async_db_context. Queries diretas viraram estilo SQLAlchemy 2.0 (select(...) + await db.execute(...)).async, todo caller dele precisa virar async def também.sqlite+sqlcipher:// para criptografia em repouso, fique na 0.8.x ou migre para SQLite/Postgres com criptografia de disco antes de atualizar.await em toda chamada de modeloEsta é a mudança mais comum — e normalmente a única que a maioria dos plugins precisa.
Antes (≤ 0.8.x):
from open_webui.models.users import Users
from open_webui.models.chats import Chats
def resolver_usuario(user_id: str):
user = Users.get_user_by_id(user_id)
chats = Chats.get_chat_list_by_user_id(user_id)
return user, chats
Depois (0.9.0+):
from open_webui.models.users import Users
from open_webui.models.chats import Chats
async def resolver_usuario(user_id: str):
user = await Users.get_user_by_id(user_id)
chats = await Chats.get_chat_list_by_user_id(user_id)
return user, chats
Note que o helper em si virou async def. Quem o chama agora precisa await — e assim por diante subindo pela árvore de chamadas.
get_db_context por get_async_db_contextRaro em plugins, mas alguns Tools abrem sessão própria para queries customizadas. Os helpers síncronos ainda existem, mas são reservados para startup/migração e não devem ser usados em runtime — eles bloqueiam o event loop e brigam com o pool async.
Antes (≤ 0.8.x):
from open_webui.internal.db import get_db_context
from open_webui.models.users import User
def contar_usuarios_ativos():
with get_db_context() as db:
return db.query(User).filter_by(is_active=True).count()
Depois (0.9.0+):
from sqlalchemy import select, func
from open_webui.internal.db import get_async_db_context
from open_webui.models.users import User
async def contar_usuarios_ativos():
async with get_async_db_context() as db:
result = await db.execute(
select(func.count()).select_from(User).where(User.is_active == True)
)
return result.scalar_one()
Diferenças-chave:
with → async withdb.query(Model) → select(Model) (importe de sqlalchemy).first() → result.scalars().first().all() → result.scalars().all().count() → result.scalar_one() em um select(func.count())...sqlalchemy.ext.asyncio.AsyncSession (não sqlalchemy.orm.Session)⚠️ Nunca use
SessionLocal,get_db,get_session,save_config,reset_config(variantes síncronas) de dentro de Tools/Functions. Eles existem apenas para startup e migrações da própria Cortex. Chamá-los em runtime trava o event loop e pode causar deadlock sob carga.
As assinaturas dos entrypoints não mudaram — pipe, inlet, outlet, stream e action já eram async def desde a 0.5.x. O que muda é o corpo.
Antes (≤ 0.8.x) — Pipe que olha o chamador:
from fastapi import Request
from open_webui.models.users import Users
from open_webui.utils.chat import generate_chat_completion
class Pipe:
async def pipe(self, body: dict, __user__: dict, __request__: Request) -> str:
full_user = Users.get_user_by_id(__user__["id"]) # ❌ sem await
body["model"] = "llama3.2:latest"
return await generate_chat_completion(__request__, body, full_user)
Depois (0.9.0+):
from fastapi import Request
from open_webui.models.users import Users
from open_webui.utils.chat import generate_chat_completion
class Pipe:
async def pipe(self, body: dict, __user__: dict, __request__: Request) -> str:
full_user = await Users.get_user_by_id(__user__["id"]) # ✅ com await
body["model"] = "llama3.2:latest"
return await generate_chat_completion(__request__, body, full_user)
Um diff de uma linha no caso mais simples — mas cada chamada de modelo dentro de inlet, outlet, stream e action precisa do mesmo tratamento.
Tools que só fazem cálculo local ou chamam APIs externas não precisam mudar. Tools que acessam modelos internos (Files, Users, Knowledges, etc.) precisam virar async def e usar await.
Antes (≤ 0.8.x) — Tool que lê arquivo anexado:
from open_webui.models.files import Files
class Tools:
def obter_preview_arquivo(self, file_id: str, __user__: dict) -> str:
file = Files.get_file_by_id_and_user_id(file_id, __user__["id"]) # ❌
return file.data.get("content", "") if file else ""
Depois (0.9.0+):
from open_webui.models.files import Files
class Tools:
async def obter_preview_arquivo(self, file_id: str, __user__: dict) -> str:
file = await Files.get_file_by_id_and_user_id(file_id, __user__["id"]) # ✅
return file.data.get("content", "") if file else ""
Se sua Tool expõe rotas FastAPI próprias (caso avançado), troque a dependência e o type hint.
Antes (≤ 0.8.x):
from fastapi import Depends
from sqlalchemy.orm import Session
from open_webui.internal.db import get_session
@router.get("/meu-endpoint")
def meu_endpoint(db: Session = Depends(get_session)):
...
Depois (0.9.0+):
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from open_webui.internal.db import get_async_session
@router.get("/meu-endpoint")
async def meu_endpoint(db: AsyncSession = Depends(get_async_session)):
...
Repetindo para eliminar dúvida: __event_emitter__ e __event_call__ funcionam igual. O construtor interno virou async, mas plugin authors não veem isso — o callable chega pronto na assinatura do método.
await __event_emitter__({"type": "status", "data": {"description": "...", "done": False}})
resposta = await __event_call__({"type": "input", "data": {...}})
Nem toda biblioteca tem API async. SDKs legados, drivers de sistemas antigos, bibliotecas de cálculo intensivo — muitos ainda são síncronos. Não os chame diretamente dentro de um método async def — eles bloqueiam o event loop e afetam toda a plataforma.
Envolva em um worker thread:
import anyio
# Em vez de:
# resultado = legacy_client.fetch(url) # ❌ bloqueia o loop
# Faça:
resultado = await anyio.to_thread.run_sync(legacy_client.fetch, url) # ✅
Quando houver alternativa async-native, prefira-a:
| Sync legado | Substituto async-native |
|---|---|
requests |
httpx.AsyncClient |
urllib |
httpx.AsyncClient ou aiohttp |
psycopg2 |
asyncpg |
pymysql |
aiomysql / asyncmy |
redis sync |
redis.asyncio |
open() para I/O intensivo |
aiofiles |
boto3 |
aioboto3 |
smtplib |
aiosmtplib |
Preocupação primariamente do operador da Cortex, mas plugins que fixam URL de DB própria precisam seguir:
aiosqlite (URLs sqlite:// são reescritas para sqlite+aiosqlite:// em runtime).asyncpg (URLs postgresql://, postgresql+psycopg2:// e postgres:// são reescritas para postgresql+asyncpg://).sqlite+sqlcipher://) — não suportado em 0.9.0+. Permaneça em 0.8.x ou migre para SQLite/Postgres padrão antes de atualizar.Na prática, 80% dos plugins migram assim:
from open_webui.models → revisar cada chamada, adicionar awaitfrom open_webui.utils / from open_webui.retrieval → revisar cada uso, adicionar await onde toca DBdb.query( → reescrever em estilo select(...) + await db.execute(...)get_db_context → trocar por get_async_db_context + async withget_db, get_session, SessionLocal → remover de código de runtimeasync def em todos os helpers que agora usam awaitrequests.get, requests.post, urllib.request → trocar por httpx.AsyncClientrequired_open_webui_version: 0.9.0 no frontmatterversion: da tool/function (1.x.0 → 2.0.0 se houver breaking change interno)<coroutine object ... at 0x...> na resposta ou como TypeError: object ... is not subscriptable.anyio.to_thread.run_sync, o gargalo aparece com ~3+ usuários concorrentes.RuntimeWarning: coroutine '...' was never awaited. Se aparecer, você esqueceu um await.Verifique:
Verifique o retorno. Agentes em Native Mode podem tratar retornos sem erro como sucesso:
# ❌ Retorna erro como string, agente não identifica
async def tool_x(self) -> str:
return "Erro: CPF inválido"
# ✅ Retorno estruturado facilita o modelo tratar
async def tool_x(self) -> dict:
return {"status": "error", "message": "CPF inválido"}
status, notification, citation funcionam bem.message/replace são sobrescritos pelo streaming do modelo.timeout em todas as chamadas externasasync/await corretamente (não use requests, use httpx)status eventos para o usuário ver progressoPipe, Filter ou ActionValves deve herdar de BaseModel (Pydantic)Field(default=..., description="...")__init__: self.valves = self.Valves()Você está chamando um método de modelo sem await. Na 0.9.0+ todo método de modelo é async def e retorna coroutine.
# ❌ Esquece o await — user fica um objeto coroutine, não o usuário
user = Users.get_user_by_id(user_id)
# ✅
user = await Users.get_user_by_id(user_id)
Se o await está dentro de um helper seu, esse helper precisa ser async def também — e quem chama o helper precisa await dele. Async propaga para cima.
Mesmo sintoma acima, detectado mais a jusante. Algo como user["email"] onde user é na verdade uma coroutine não awaited. Adicione o await na chamada original.
Você está usando API síncrona do SQLAlchemy 1.x (db.query(Model)...) numa sessão async. Reescreva para estilo 2.0:
# ❌
users = db.query(User).filter_by(is_active=True).all()
# ✅
result = await db.execute(select(User).where(User.is_active == True))
users = result.scalars().all()
Sinais típicos:
asyncio, coroutine, AsyncSession<coroutine object ...>)ImportError: cannot import name 'get_db_context' from 'open_webui.internal.db' (helper renomeado)Siga o checklist em Migração para Cortex 0.9.0+.
Sua Tool provavelmente chama biblioteca síncrona (requests, psycopg2, leitura de arquivo grande com open()) de dentro de um método async def. Isso bloqueia o event loop da Cortex toda até a chamada terminar — todos os outros chats ficam pausados.
Corrija de uma de duas maneiras:
# Opção A: troque por lib async-native
import httpx
async with httpx.AsyncClient(timeout=10) as client:
r = await client.get(url)
# Opção B: envolva a lib síncrona em worker thread
import anyio
resultado = await anyio.to_thread.run_sync(legacy_sync_fn, arg1, arg2)
A 0.9.0+ não suporta mais sqlite+sqlcipher://. Não existe driver SQLCipher com API async compatível com SQLAlchemy 2.0.
Opções:
Crie uma tool de debug que emite várias mensagens:
async def debug_env(
self,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__=None,
) -> dict:
"""Ferramenta de debug — imprime contexto injetado pela Cortex."""
return {
"user_id": __user__.get("id") if __user__ else None,
"user_email": __user__.get("email") if __user__ else None,
"user_role": __user__.get("role") if __user__ else None,
"has_valves": bool(__user__.get("valves")) if __user__ else False,
"function_calling": (
__metadata__.get("params", {}).get("function_calling", "default")
if __metadata__ else None
),
"event_emitter_available": __event_emitter__ is not None,
}
Tool pode retornar URLs de arquivos gerados (via __event_emitter__ com tipo files):
await __event_emitter__({
"type": "files",
"data": {"files": [{"name": "relatorio.pdf", "url": "/files/abc"}]}
})
Em Native Mode, o modelo pode chamar múltiplas tools em sequência sem intervenção do usuário:
Isso é interleaved thinking / agentic behavior — funciona bem com modelos top (gpt-5, claude-opus-4-7, grok-4.20-reasoning).
Combine __event_call__ com confirmation / input para casos críticos:
Para tools com chamadas repetidas e resultado estável (ex.: cotação do dia):
from functools import lru_cache
from datetime import datetime
class Tools:
def __init__(self):
self._cache = {}
async def cotacao(self, moeda: str) -> dict:
hoje = datetime.now().date().isoformat()
chave = (moeda, hoje)
if chave in self._cache:
return self._cache[chave]
resultado = await self._fetch(moeda)
self._cache[chave] = resultado
return resultado
💬 Precisa de apoio para desenvolver tools específicas ou revisar arquitetura de extensibilidade? A SinapseTech oferece consultoria dedicada em desenvolvimento de plugins Python para Cortex, revisão de segurança, governança de extensões. Fale com a equipe via Atendimento e Suporte.
📖 Documentação complementar (especificações internas, templates oficiais de Tools/Functions corporativos, catálogo de exemplos prontos para uso) está disponível sob NDA para clientes enterprise.