Saltar a contenido

Investigación: Alta Concurrencia y Agregación (Debounce/Conflation) en Quarkus Mutiny

Este documento analiza el escenario propuesto: un paciente enviando >400 mensajes por segundo desde la UI (dkv-client), generando un "event flood" (inundación de eventos) en RabbitMQ, y cómo esto impacta a nuestra arquitectura actual basada en Quarkus y Dapr Workflows.

1. El Problema Base: Impacto en Dapr Workflows

Actualmente, el EncounterEventConsumer intercepta cada mensaje AMQP y evalúa si debe interactuar con Dapr. En el escenario de 400 mensajes / segundo de un mismo paciente (mismo encounterId):

  1. Si creamos Workflows nuevos por cada mensaje: Enviaremos 400 peticiones gRPC/s al Dapr Sidecar. Se instanciarán 400 workflows independientes que intentarán lanzar notificaciones simultáneas. Esto saturará la base de datos (State Store) de Dapr rápidamente.
  2. Si el Workflow ya existe y usamos Dapr "Raise Event" (Signals): Dapr Durable Task se basa en Event Sourcing. Cada vez que envías una señal externa a un workflow para decirle "el paciente ha escrito, resetea el timer de inactividad", Dapr escribe una nueva fila en el historial de ese workflow en disco. ¡400 eventos/s generarán un historial masivo en milisegundos, degradando la memoria al hacer "replay" del workflow!

[!WARNING] Resulta vital no derivar eventos de "alta frecuencia innecesarios" al motor de orquestación de Dapr. Dapr es excelente gestionando estado y timers a largo plazo (horas/días), pero no está diseñado para ingerir ráfagas continuas de alta frecuencia para la misma entidad en tiempo real.

La solución debe implementarse antes de llegar a Dapr, en la capa de mensajería (SmallRye / Mutiny).


2. Técnicas de Agregación en Quarkus (SmallRye Mutiny)

SmallRye Reactive Messaging junto con Mutiny ofrecen herramientas sofisticadas de manejo de tiempo y backpressure para colapsar (conflate) o agrupar streams reactivos.

A continuación, los tres enfoques principales para resolver este reto:

Opción A: KeyedMulti + Debounce (Recomendado)

En lugar de procesar los eventos de RabbitMQ uno por uno, la especificación de SmallRye permite particionar el flujo de eventos según una clave y aplicar un retraso de silencio (debounce).

@Incoming("encounter-events")
@Outgoing("throttled-events") // Solo los eventos "filtrados" pasan
public Multi<EncounterEvent> throttleMessages(KeyedMulti<String, JsonObject> keyedStream) {
    // La infraestructura extrae el encounterId y procesa cada grupo de manera independiente

    return keyedStream.onItem().transformToMultiAndMerge(encounterGroup -> 
        encounterGroup
            // El operador mágico: Se traga los eventos que lleguen 
            // con menos de 1 segundo de diferencia del anterior. 
            // Si el paciente escribe 400 veces en 1s, sólo emite el ÚLTIMO evento tras silenciarse 1s.
            .debounce().ofDuration(Duration.ofSeconds(1))
            .onItem().transform(json -> json.mapTo(EncounterEvent.class))
    );
}

// Otro método procesa los eventos calmados y llama a Dapr
@Incoming("throttled-events")
public CompletionStage<Void> consumeThrottled(Message<EncounterEvent> msg) {
    // Aquí invocamos a Dapr con total seguridad (máx 1 evento/segundo por chat)
}

Ventajas: - Cancela por completo la carga de Workflows "basura". - Resuelve exactamente tu problema (400 pulsaciones/s se reducen a 1 solo evento emitido temporalmente). - Gestión en memoria altamente optimizada (uso de threads del Event-Loop original en Vert.x).

Opción B: Batching / Buffering (Ventaneado temporal)

Si en vez de "eliminar eventos intermedios" quisiéramos procesarlos todos de golpe en Micro-Batches:

@Incoming("encounter-events")
public Multi<Void> consumeBatches(Multi<Message<JsonObject>> stream) {
    return stream
        // Agrupar mensajes en listas de 500ms o hasta que haya 100 mensajes
        .group().intoLists().every(Duration.ofMillis(500), 100)
        .onItem().transformToUniAndConcatenate(messages -> {

            // Agrupamos usando algoritmos para quedarnos el id único más reciente
            Map<String, Message<JsonObject>> latestPerEncounter = extractLatest(messages);

            // Actualizar Dapr sólo 1 vez por cada encounter en el batch
            return updateDaprWorkflows(latestPerEncounter).replaceWithVoid();
        });
}

Ventajas: - Retiene todos los eventos intermedios, lo que sirve si debemos reenviarlos a otro sitio (por ejemplo, insertar en Elasticsearch masivamente o BigData).

Opción C: Throttling de la Capa HTTP de Webhooks (Backpressure)

Otra preocupación que mencionas es la saturación. Si disparamos Webhooks Dinámicos por cada pulsación individual, los sistemas receptores de la plataforma (Bots de IA, CRMs, APIs externas) sufrirán un ataque de denegación de servicio no intencionado (DDoS).

Mutiny maneja nativamente la presión a través del backpressure (el consumidor de base no saca más mensajes de la cola AMQP de los que el Thread Worker es capaz de procesar asíncronamente con .onItem().transformToUniAndConcatenate()).


3. Contraste de Enfoques: Backpressure Externo vs. Actor Nativo

Para entender por qué se utiliza un embudo (Mutiny) externo en lugar de dejarle el control total al orquestador, debemos contrastar dos modelos de diseño en Dapr:

A) El Modelo "Mutiny Proxy + Dapr Workflow" (Seleccionado)

  • Flujo: RabbitMQ -> Mutiny (en memoria) -> debounce(1s) -> gRPC -> Dapr Workflow.
  • Comportamiento: Mutiny absorbe las 400 puestas en su propio heap de memoria de la JVM. Desecha 399 y envía 1. Dapr Workflow recibe un único Signal por segundo y guarda 1 evento en su tabla de Event Sourcing.
  • Ventajas: El disco de la base de datos de Dapr no sufre. Mantenemos la magia matemática de los Timers a largo plazo (días, meses) que nos regala la estructura Workflow as Code.
  • Desventajas: El flujo está fragmentado (una parte en mensajería reactiva y otra en el Workflow).

B) El Modelo "Dapr Actor Puro" (Patrón Buzón / Mailbox Completo)

Si decidiéramos que el Actor sea el amo absoluto y reciba directamente los 400 eventos desde RabbitMQ sin filtrado previo: * Flujo: RabbitMQ -> Dapr Actor (vía method invocation). * Comportamiento: El Dapr Actor encola los 400 requests en su buzón local y comienza a procesarlos secuencialmente, uno por uno. Como el Dapr Actor no es Event Sourced (a diferencia del Workflow), simplemente actualiza su estado en memoria (stateManager.set()). Al final de cada iteración del bucle, decidirá si salva a disco (stateManager.save()). Si sólo salva cada cierto tiempo o al quedarse sin mensajes, el I/O se mantiene muy bajo. * Ventajas: Código 100% cohesionado en un solo lugar (El Actor). * Desventajas: Para lanzar una acción dentro de 24 horas, un Dapr Actor te obliga a usar Reminders explícitos (actor.registerReminder(...)), crear métodos callback (receiveReminder), e inyectar lógica de retries si la red falla en el segundo 24h. Es un regreso al boilerplate estilo Pekko PersistentActor, perdiendo la linealidad narrativa del motor de Workflow.

Resolución: Se prioriza la legibilidad a largo plazo ("Workflow as Code") que aporta Dapr Workflows en combinación con el debouncing asíncrono de Mutiny, relegando la I/O masiva a la capa de memoria de Quarkus.


4. Conclusión Estratégica

Para el escenario de tu prueba de estrés (400 puestas en segundo), he aquí las principales conclusiones confirmadas por Quarkus Mutiny Caching y Dapr:

  1. El Flood Test rompería la DB de persistencia: Dapr usa State Stores, y emitir 400 señales por segundo para el mismo UUID a un mismo Workflow resultaría catastrófico a nivel base de datos (too many connections o bloqueo de bloqueos transaccionales OptimisticConcurrencyException).
  2. Mutiny como escudo Non-Blocking: Las colisiones de concurrencia a nivel computación pura no son problema. Quarkus y Mutiny no crean 400 hilos por cada 400 mensajes; todo se empaqueta en Event-Loops eficientemente. El cuello de botella es I/O contra Dapr.
  3. Siguiente Acción: Refactorizar el EncounterEventConsumer usando transformaciones en streams de la Familia de Patrones de Absorción: ya sea a nivel debounce() o una caché caffeine local TTL=1s que actúe de proxy antes de enviar el evento gRPC a dapr-sidecar.