Saltar a contenido

AMQP Gateway — Arquitectura Técnica

Estado: ✅ Implementado
Fecha: 4 de abril de 2026
Tipo: Arquitectura técnica
Audiencia: Equipo de desarrollo


1. Resumen

dkv-pet-flows-api integra un AMQP Gateway nativo que reemplaza al anterior Dapr RabbitMQ binding. Utiliza SmallRye RabbitMQ (extensión oficial de Quarkus) para publicar y consumir mensajes AMQP 0.9.1 directamente, sin proxies ni sidecars intermedios.

Motivación

Problema anterior Solución actual
netcomp publicaba directo a RabbitMQ Management API (:15672) netcomp publica a dkv-pet-flows REST (/api/v1/events)
Consumer dependía de Dapr sidecar binding Consumer nativo SmallRye @Incoming
Sin capacidad de suscripción dinámica REST API de subscribe/unsubscribe con webhooks
Sin control del lifecycle de conexión SmallRye maneja reconexión automática con backoff

2. Arquitectura

flowchart LR
    classDef ext fill:#e1f5fe,stroke:#0288d1,stroke-width:2px,color:#000;
    classDef api fill:#fff3e0,stroke:#f57c00,stroke-width:2px,color:#000;
    classDef amqp fill:#e8f5e9,stroke:#388e3c,stroke-width:2px,color:#000;
    classDef core fill:#ede7f6,stroke:#512da8,stroke-width:2px,color:#000;
    classDef db fill:#fce4ec,stroke:#c2185b,stroke-width:2px,color:#000;

    NC["netcomp\n(Erlang)"]:::ext
    RMQ[("RabbitMQ\n(AMQP 0.9.1)")]:::db
    SUBS["Subscribers"]:::ext

    subgraph PF ["dkv-pet-flows-api"]
        REST["REST API\nController"]:::api
        PUB["SmallRye AMQP\nPublisher"]:::amqp
        CONS["SmallRye AMQP\nConsumer"]:::amqp
        DISP["Event\nDispatch"]:::core
        WT["Workflow\nTrigger"]:::core
        WHF["Webhook\nForward"]:::core
        SUB_API["Subscription API\n(POST/GET/DELETE)"]:::api

        REST --> PUB
        CONS --> DISP
        DISP --> WT
        DISP --> WHF
    end

    NC -- "POST /api/v1/events\nEncounterEvent" --> REST
    PUB -- "publish" --> RMQ
    RMQ -- "push" --> CONS
    WHF -- "POST webhookUrl" --> SUBS

Flujo de datos paso a paso

  1. netcomp crea un evento en ES vía create_event/2
  2. El hook maybe_publish_to_rabbitmq/2 hace spawn + HTTP POST a /api/v1/events
  3. EventPublishResource recibe el JSON plano, lo valida, y lo publica a RabbitMQ vía SmallRye Emitter
  4. RabbitMQ persiste el mensaje en la cola pet-flows-events (durability mode 2)
  5. EncounterEventConsumer recibe el mensaje vía @Incoming("encounter-events")
  6. El consumer ejecuta dos acciones en paralelo:
  7. Agenda un PetAbandonedConversationWorkflow vía Dapr
  8. Invoca EventDispatcher.forward() para reenviar a webhooks registrados
  9. Se hace message.ack() para confirmar el procesamiento

3. Diagramas de Secuencia

3.1 Publicación de Evento

sequenceDiagram
    participant NC as netcomp
    participant PF as dkv-pet-flows REST
    participant EM as SmallRye Emitter
    participant RMQ as RabbitMQ

    NC->>NC: create_event → persist ES → get EvId
    NC->>PF: POST /api/v1/events<br/>EncounterEvent JSON plano
    PF->>PF: Validate EncounterEvent
    PF->>EM: emitter.send(event)
    EM->>RMQ: basic.publish(exchange=pet.events, rk=encounter.tmipw_expired)
    PF-->>NC: 202 Accepted
    Note over RMQ: Message persisted (delivery_mode=2)

3.2 Consumo y Dispatch

sequenceDiagram
    participant RMQ as RabbitMQ
    participant SR as SmallRye Consumer
    participant DSP as EventDispatcher
    participant WF as Workflow Engine
    participant WH as Webhook Subscribers

    RMQ->>SR: basic.deliver(message)
    SR->>DSP: dispatch(EncounterEvent)

    par Procesamiento interno
        DSP->>WF: scheduleNewWorkflow(event)
    and Webhook forwarding
        DSP->>DSP: lookup matching subscriptions
        loop Para cada suscriptor
            DSP->>WH: POST webhookUrl (event payload)
            WH-->>DSP: 200 OK
        end
    end

    SR-->>RMQ: basic.ack

3.3 Suscripción Dinámica

sequenceDiagram
    participant CLI as Servicio externo
    participant API as Subscription API
    participant REG as SubscriptionRegistry
    participant RMQ as RabbitMQ

    CLI->>API: POST /api/v1/subscriptions<br/>routingKeyPattern + webhookUrl
    API->>REG: register(subscription)
    REG->>REG: Store in memory + persist JSON
    API-->>CLI: 201 Created (id, status=ACTIVE)

    Note over CLI,RMQ: Mensajes que matcheen el patrón se reenvían al webhook

3.4 Unsubscribe

sequenceDiagram
    participant CLI as Servicio externo
    participant API as Subscription API
    participant REG as SubscriptionRegistry

    CLI->>API: DELETE /api/v1/subscriptions/{id}
    API->>REG: remove(id)
    API-->>CLI: 204 No Content

3.5 Reconexión del Broker

sequenceDiagram
    participant PF as dkv-pet-flows
    participant SR as SmallRye Connector
    participant RMQ as RabbitMQ

    Note over PF,RMQ: Operación normal
    RMQ--xSR: Conexión TCP perdida
    SR->>SR: Log warning, iniciar reconexión
    loop Backoff exponencial (built-in SmallRye)
        SR->>RMQ: Intento conexión AMQP
        RMQ--xSR: Connection refused
        SR->>SR: Wait 1s, 2s, 4s, 8s...
    end
    SR->>RMQ: Conexión AMQP establecida
    RMQ-->>SR: CONNECTED
    SR->>RMQ: Re-declarar colas + re-suscribir
    Note over PF,RMQ: Consumer activo de nuevo

4. API REST

4.1 Publicación de Eventos

POST /api/v1/events
Content-Type: application/json

{
    "event_id": "abc-123",
    "type": "encounter",
    "subtype": "tmipw_expired",
    "encounter_id": "enc-456",
    "queue_id": "q-789",
    "patient_id": "p-012",
    "brand_id": "b-345",
    "datetime": 1712345678000
}

→ 202 Accepted
{
    "routed": true,
    "routingKey": "encounter.tmipw_expired",
    "eventId": "abc-123"
}

4.2 Suscripciones Dinámicas

POST /api/v1/subscriptions
Content-Type: application/json

{
    "routingKeyPattern": "encounter.*",
    "webhookUrl": "http://my-service:8080/callback"
}

→ 201 Created
{
    "id": "uuid",
    "routingKeyPattern": "encounter.*",
    "webhookUrl": "http://my-service:8080/callback",
    "createdAt": "2026-04-04T17:10:00Z",
    "status": "ACTIVE"
}
GET /api/v1/subscriptions → Lista completa
GET /api/v1/subscriptions/{id} → Detalle
DELETE /api/v1/subscriptions/{id} → 204 No Content

5. Ficheros Implementados

Java (dkv-pet-flows-api)

Fichero Paquete Rol
EventPublishResource.java com.dkv.pet.flows.api REST endpoint /api/v1/events
SubscriptionResource.java com.dkv.pet.flows.api REST CRUD /api/v1/subscriptions
EncounterEventConsumer.java com.dkv.pet.flows.messaging @Incoming AMQP consumer
EventDispatcher.java com.dkv.pet.flows.messaging Webhook forwarding (JDK 21 HttpClient)
SubscriptionRegistry.java com.dkv.pet.flows.messaging Registry in-memory + JSON file backup
Subscription.java com.dkv.pet.flows.domain Record: id, routingKeyPattern, webhookUrl, createdAt, status
SubscribeRequest.java com.dkv.pet.flows.domain Record validado de request
EncounterEvent.java com.dkv.pet.flows.domain Record: 8 campos (actualizado javadoc)

Erlang (netcomp)

Fichero Función Cambio
dkv_telemed_util.erl publish_to_rabbitmq/2 URL → pet_flows_events_url, body → JSON plano

Eliminados

Fichero Razón
EventWorkflowTrigger.java Reemplazado por EncounterEventConsumer
dapr/components/rabbitmq-binding.yaml Reemplazado por config SmallRye en application.properties

6. Configuración

application.properties (Quarkus)

# Conexión RabbitMQ
rabbitmq-host=${RABBITMQ_HOST:localhost}
rabbitmq-port=${RABBITMQ_PORT:5672}
rabbitmq-username=${RABBITMQ_USERNAME:guest}
rabbitmq-password=${RABBITMQ_PASSWORD:guest}

# Consumer (inbound)
mp.messaging.incoming.encounter-events.connector=smallrye-rabbitmq
mp.messaging.incoming.encounter-events.exchange.name=pet.events
mp.messaging.incoming.encounter-events.exchange.type=topic
mp.messaging.incoming.encounter-events.routing-keys=encounter.*
mp.messaging.incoming.encounter-events.queue.name=pet-flows-events
mp.messaging.incoming.encounter-events.queue.durable=true

# Publisher (outbound)
mp.messaging.outgoing.event-outbound.connector=smallrye-rabbitmq
mp.messaging.outgoing.event-outbound.exchange.name=pet.events
mp.messaging.outgoing.event-outbound.exchange.type=topic

Erlang (sys.config)

{dkv_telemed, [
    {rabbitmq_events_enabled, true},           %% Feature flag
    {pet_flows_events_url,                     %% URL de dkv-pet-flows
        <<"http://dkv-pet-flows:8080/api/v1/events">>}
]}

7. Estado y Escalabilidad

Aspecto Solución
Suscripciones ConcurrentHashMap in-memory + backup a fichero JSON local
Pérdida de estado al restart El fichero JSON se recarga al arrancar (@PostConstruct)
Escalabilidad horizontal Cada instancia tiene sus propias suscripciones (suficiente para MVP Backlog)
Dapr Sigue activo para workflows (DaprWorkflowClient). Solo se eliminó el binding RabbitMQ
Reconexión SmallRye maneja automáticamente con backoff exponencial

Documentos Relacionados

Documento Enlace
MVP — Dinamización mvp_dinamizacion.md
Arquitectura del Sistema arquitectura_sistema.md
Spike Dapr Workflows dapr_workflows_spike.md
Investigación DKV Pet Cloud investigacion_dkv_pet_cloud.md