AMQP Gateway — Arquitectura Técnica
Estado: ✅ Implementado
Fecha: 4 de abril de 2026
Tipo: Arquitectura técnica
Audiencia: Equipo de desarrollo
1. Resumen
dkv-pet-flows-api integra un AMQP Gateway nativo que reemplaza al anterior Dapr RabbitMQ binding. Utiliza SmallRye RabbitMQ (extensión oficial de Quarkus) para publicar y consumir mensajes AMQP 0.9.1 directamente, sin proxies ni sidecars intermedios.
Motivación
| Problema anterior |
Solución actual |
| netcomp publicaba directo a RabbitMQ Management API (:15672) |
netcomp publica a dkv-pet-flows REST (/api/v1/events) |
| Consumer dependía de Dapr sidecar binding |
Consumer nativo SmallRye @Incoming |
| Sin capacidad de suscripción dinámica |
REST API de subscribe/unsubscribe con webhooks |
| Sin control del lifecycle de conexión |
SmallRye maneja reconexión automática con backoff |
2. Arquitectura
flowchart LR
classDef ext fill:#e1f5fe,stroke:#0288d1,stroke-width:2px,color:#000;
classDef api fill:#fff3e0,stroke:#f57c00,stroke-width:2px,color:#000;
classDef amqp fill:#e8f5e9,stroke:#388e3c,stroke-width:2px,color:#000;
classDef core fill:#ede7f6,stroke:#512da8,stroke-width:2px,color:#000;
classDef db fill:#fce4ec,stroke:#c2185b,stroke-width:2px,color:#000;
NC["netcomp\n(Erlang)"]:::ext
RMQ[("RabbitMQ\n(AMQP 0.9.1)")]:::db
SUBS["Subscribers"]:::ext
subgraph PF ["dkv-pet-flows-api"]
REST["REST API\nController"]:::api
PUB["SmallRye AMQP\nPublisher"]:::amqp
CONS["SmallRye AMQP\nConsumer"]:::amqp
DISP["Event\nDispatch"]:::core
WT["Workflow\nTrigger"]:::core
WHF["Webhook\nForward"]:::core
SUB_API["Subscription API\n(POST/GET/DELETE)"]:::api
REST --> PUB
CONS --> DISP
DISP --> WT
DISP --> WHF
end
NC -- "POST /api/v1/events\nEncounterEvent" --> REST
PUB -- "publish" --> RMQ
RMQ -- "push" --> CONS
WHF -- "POST webhookUrl" --> SUBS
Flujo de datos paso a paso
- netcomp crea un evento en ES vía
create_event/2
- El hook
maybe_publish_to_rabbitmq/2 hace spawn + HTTP POST a /api/v1/events
- EventPublishResource recibe el JSON plano, lo valida, y lo publica a RabbitMQ vía SmallRye
Emitter
- RabbitMQ persiste el mensaje en la cola
pet-flows-events (durability mode 2)
- EncounterEventConsumer recibe el mensaje vía
@Incoming("encounter-events")
- El consumer ejecuta dos acciones en paralelo:
- Agenda un
PetAbandonedConversationWorkflow vía Dapr
- Invoca
EventDispatcher.forward() para reenviar a webhooks registrados
- Se hace
message.ack() para confirmar el procesamiento
3. Diagramas de Secuencia
3.1 Publicación de Evento
sequenceDiagram
participant NC as netcomp
participant PF as dkv-pet-flows REST
participant EM as SmallRye Emitter
participant RMQ as RabbitMQ
NC->>NC: create_event → persist ES → get EvId
NC->>PF: POST /api/v1/events<br/>EncounterEvent JSON plano
PF->>PF: Validate EncounterEvent
PF->>EM: emitter.send(event)
EM->>RMQ: basic.publish(exchange=pet.events, rk=encounter.tmipw_expired)
PF-->>NC: 202 Accepted
Note over RMQ: Message persisted (delivery_mode=2)
3.2 Consumo y Dispatch
sequenceDiagram
participant RMQ as RabbitMQ
participant SR as SmallRye Consumer
participant DSP as EventDispatcher
participant WF as Workflow Engine
participant WH as Webhook Subscribers
RMQ->>SR: basic.deliver(message)
SR->>DSP: dispatch(EncounterEvent)
par Procesamiento interno
DSP->>WF: scheduleNewWorkflow(event)
and Webhook forwarding
DSP->>DSP: lookup matching subscriptions
loop Para cada suscriptor
DSP->>WH: POST webhookUrl (event payload)
WH-->>DSP: 200 OK
end
end
SR-->>RMQ: basic.ack
3.3 Suscripción Dinámica
sequenceDiagram
participant CLI as Servicio externo
participant API as Subscription API
participant REG as SubscriptionRegistry
participant RMQ as RabbitMQ
CLI->>API: POST /api/v1/subscriptions<br/>routingKeyPattern + webhookUrl
API->>REG: register(subscription)
REG->>REG: Store in memory + persist JSON
API-->>CLI: 201 Created (id, status=ACTIVE)
Note over CLI,RMQ: Mensajes que matcheen el patrón se reenvían al webhook
3.4 Unsubscribe
sequenceDiagram
participant CLI as Servicio externo
participant API as Subscription API
participant REG as SubscriptionRegistry
CLI->>API: DELETE /api/v1/subscriptions/{id}
API->>REG: remove(id)
API-->>CLI: 204 No Content
3.5 Reconexión del Broker
sequenceDiagram
participant PF as dkv-pet-flows
participant SR as SmallRye Connector
participant RMQ as RabbitMQ
Note over PF,RMQ: Operación normal
RMQ--xSR: Conexión TCP perdida
SR->>SR: Log warning, iniciar reconexión
loop Backoff exponencial (built-in SmallRye)
SR->>RMQ: Intento conexión AMQP
RMQ--xSR: Connection refused
SR->>SR: Wait 1s, 2s, 4s, 8s...
end
SR->>RMQ: Conexión AMQP establecida
RMQ-->>SR: CONNECTED
SR->>RMQ: Re-declarar colas + re-suscribir
Note over PF,RMQ: Consumer activo de nuevo
4. API REST
4.1 Publicación de Eventos
POST /api/v1/events
Content-Type: application/json
{
"event_id": "abc-123",
"type": "encounter",
"subtype": "tmipw_expired",
"encounter_id": "enc-456",
"queue_id": "q-789",
"patient_id": "p-012",
"brand_id": "b-345",
"datetime": 1712345678000
}
→ 202 Accepted
{
"routed": true,
"routingKey": "encounter.tmipw_expired",
"eventId": "abc-123"
}
4.2 Suscripciones Dinámicas
POST /api/v1/subscriptions
Content-Type: application/json
{
"routingKeyPattern": "encounter.*",
"webhookUrl": "http://my-service:8080/callback"
}
→ 201 Created
{
"id": "uuid",
"routingKeyPattern": "encounter.*",
"webhookUrl": "http://my-service:8080/callback",
"createdAt": "2026-04-04T17:10:00Z",
"status": "ACTIVE"
}
GET /api/v1/subscriptions → Lista completa
GET /api/v1/subscriptions/{id} → Detalle
DELETE /api/v1/subscriptions/{id} → 204 No Content
5. Ficheros Implementados
Java (dkv-pet-flows-api)
| Fichero |
Paquete |
Rol |
EventPublishResource.java |
com.dkv.pet.flows.api |
REST endpoint /api/v1/events |
SubscriptionResource.java |
com.dkv.pet.flows.api |
REST CRUD /api/v1/subscriptions |
EncounterEventConsumer.java |
com.dkv.pet.flows.messaging |
@Incoming AMQP consumer |
EventDispatcher.java |
com.dkv.pet.flows.messaging |
Webhook forwarding (JDK 21 HttpClient) |
SubscriptionRegistry.java |
com.dkv.pet.flows.messaging |
Registry in-memory + JSON file backup |
Subscription.java |
com.dkv.pet.flows.domain |
Record: id, routingKeyPattern, webhookUrl, createdAt, status |
SubscribeRequest.java |
com.dkv.pet.flows.domain |
Record validado de request |
EncounterEvent.java |
com.dkv.pet.flows.domain |
Record: 8 campos (actualizado javadoc) |
Erlang (netcomp)
| Fichero |
Función |
Cambio |
dkv_telemed_util.erl |
publish_to_rabbitmq/2 |
URL → pet_flows_events_url, body → JSON plano |
Eliminados
| Fichero |
Razón |
EventWorkflowTrigger.java |
Reemplazado por EncounterEventConsumer |
dapr/components/rabbitmq-binding.yaml |
Reemplazado por config SmallRye en application.properties |
6. Configuración
application.properties (Quarkus)
# Conexión RabbitMQ
rabbitmq-host=${RABBITMQ_HOST:localhost}
rabbitmq-port=${RABBITMQ_PORT:5672}
rabbitmq-username=${RABBITMQ_USERNAME:guest}
rabbitmq-password=${RABBITMQ_PASSWORD:guest}
# Consumer (inbound)
mp.messaging.incoming.encounter-events.connector=smallrye-rabbitmq
mp.messaging.incoming.encounter-events.exchange.name=pet.events
mp.messaging.incoming.encounter-events.exchange.type=topic
mp.messaging.incoming.encounter-events.routing-keys=encounter.*
mp.messaging.incoming.encounter-events.queue.name=pet-flows-events
mp.messaging.incoming.encounter-events.queue.durable=true
# Publisher (outbound)
mp.messaging.outgoing.event-outbound.connector=smallrye-rabbitmq
mp.messaging.outgoing.event-outbound.exchange.name=pet.events
mp.messaging.outgoing.event-outbound.exchange.type=topic
Erlang (sys.config)
{dkv_telemed, [
{rabbitmq_events_enabled, true}, %% Feature flag
{pet_flows_events_url, %% URL de dkv-pet-flows
<<"http://dkv-pet-flows:8080/api/v1/events">>}
]}
7. Estado y Escalabilidad
| Aspecto |
Solución |
| Suscripciones |
ConcurrentHashMap in-memory + backup a fichero JSON local |
| Pérdida de estado al restart |
El fichero JSON se recarga al arrancar (@PostConstruct) |
| Escalabilidad horizontal |
Cada instancia tiene sus propias suscripciones (suficiente para MVP Backlog) |
| Dapr |
Sigue activo para workflows (DaprWorkflowClient). Solo se eliminó el binding RabbitMQ |
| Reconexión |
SmallRye maneja automáticamente con backoff exponencial |
Documentos Relacionados