Investigación: Alta Concurrencia y Agregación (Debounce/Conflation) en Quarkus Mutiny¶
Este documento analiza el escenario propuesto: un paciente enviando >400 mensajes por segundo desde la UI (dkv-client), generando un "event flood" (inundación de eventos) en RabbitMQ, y cómo esto impacta a nuestra arquitectura actual basada en Quarkus y Dapr Workflows.
1. El Problema Base: Impacto en Dapr Workflows¶
Actualmente, el EncounterEventConsumer intercepta cada mensaje AMQP y evalúa si debe interactuar con Dapr. En el escenario de 400 mensajes / segundo de un mismo paciente (mismo encounterId):
- Si creamos Workflows nuevos por cada mensaje: Enviaremos 400 peticiones gRPC/s al Dapr Sidecar. Se instanciarán 400 workflows independientes que intentarán lanzar notificaciones simultáneas. Esto saturará la base de datos (State Store) de Dapr rápidamente.
- Si el Workflow ya existe y usamos Dapr "Raise Event" (Signals): Dapr Durable Task se basa en Event Sourcing. Cada vez que envías una señal externa a un workflow para decirle "el paciente ha escrito, resetea el timer de inactividad", Dapr escribe una nueva fila en el historial de ese workflow en disco. ¡400 eventos/s generarán un historial masivo en milisegundos, degradando la memoria al hacer "replay" del workflow!
[!WARNING] Resulta vital no derivar eventos de "alta frecuencia innecesarios" al motor de orquestación de Dapr. Dapr es excelente gestionando estado y timers a largo plazo (horas/días), pero no está diseñado para ingerir ráfagas continuas de alta frecuencia para la misma entidad en tiempo real.
La solución debe implementarse antes de llegar a Dapr, en la capa de mensajería (SmallRye / Mutiny).
2. Técnicas de Agregación en Quarkus (SmallRye Mutiny)¶
SmallRye Reactive Messaging junto con Mutiny ofrecen herramientas sofisticadas de manejo de tiempo y backpressure para colapsar (conflate) o agrupar streams reactivos.
A continuación, los tres enfoques principales para resolver este reto:
Opción A: KeyedMulti + Debounce (Recomendado)¶
En lugar de procesar los eventos de RabbitMQ uno por uno, la especificación de SmallRye permite particionar el flujo de eventos según una clave y aplicar un retraso de silencio (debounce).
@Incoming("encounter-events")
@Outgoing("throttled-events") // Solo los eventos "filtrados" pasan
public Multi<EncounterEvent> throttleMessages(KeyedMulti<String, JsonObject> keyedStream) {
// La infraestructura extrae el encounterId y procesa cada grupo de manera independiente
return keyedStream.onItem().transformToMultiAndMerge(encounterGroup ->
encounterGroup
// El operador mágico: Se traga los eventos que lleguen
// con menos de 1 segundo de diferencia del anterior.
// Si el paciente escribe 400 veces en 1s, sólo emite el ÚLTIMO evento tras silenciarse 1s.
.debounce().ofDuration(Duration.ofSeconds(1))
.onItem().transform(json -> json.mapTo(EncounterEvent.class))
);
}
// Otro método procesa los eventos calmados y llama a Dapr
@Incoming("throttled-events")
public CompletionStage<Void> consumeThrottled(Message<EncounterEvent> msg) {
// Aquí invocamos a Dapr con total seguridad (máx 1 evento/segundo por chat)
}
Ventajas: - Cancela por completo la carga de Workflows "basura". - Resuelve exactamente tu problema (400 pulsaciones/s se reducen a 1 solo evento emitido temporalmente). - Gestión en memoria altamente optimizada (uso de threads del Event-Loop original en Vert.x).
Opción B: Batching / Buffering (Ventaneado temporal)¶
Si en vez de "eliminar eventos intermedios" quisiéramos procesarlos todos de golpe en Micro-Batches:
@Incoming("encounter-events")
public Multi<Void> consumeBatches(Multi<Message<JsonObject>> stream) {
return stream
// Agrupar mensajes en listas de 500ms o hasta que haya 100 mensajes
.group().intoLists().every(Duration.ofMillis(500), 100)
.onItem().transformToUniAndConcatenate(messages -> {
// Agrupamos usando algoritmos para quedarnos el id único más reciente
Map<String, Message<JsonObject>> latestPerEncounter = extractLatest(messages);
// Actualizar Dapr sólo 1 vez por cada encounter en el batch
return updateDaprWorkflows(latestPerEncounter).replaceWithVoid();
});
}
Ventajas: - Retiene todos los eventos intermedios, lo que sirve si debemos reenviarlos a otro sitio (por ejemplo, insertar en Elasticsearch masivamente o BigData).
Opción C: Throttling de la Capa HTTP de Webhooks (Backpressure)¶
Otra preocupación que mencionas es la saturación. Si disparamos Webhooks Dinámicos por cada pulsación individual, los sistemas receptores de la plataforma (Bots de IA, CRMs, APIs externas) sufrirán un ataque de denegación de servicio no intencionado (DDoS).
Mutiny maneja nativamente la presión a través del backpressure (el consumidor de base no saca más mensajes de la cola AMQP de los que el Thread Worker es capaz de procesar asíncronamente con .onItem().transformToUniAndConcatenate()).
3. Contraste de Enfoques: Backpressure Externo vs. Actor Nativo¶
Para entender por qué se utiliza un embudo (Mutiny) externo en lugar de dejarle el control total al orquestador, debemos contrastar dos modelos de diseño en Dapr:
A) El Modelo "Mutiny Proxy + Dapr Workflow" (Seleccionado)¶
- Flujo: RabbitMQ -> Mutiny (en memoria) ->
debounce(1s)-> gRPC -> Dapr Workflow. - Comportamiento: Mutiny absorbe las 400 puestas en su propio heap de memoria de la JVM. Desecha 399 y envía 1. Dapr Workflow recibe un único
Signalpor segundo y guarda 1 evento en su tabla de Event Sourcing. - Ventajas: El disco de la base de datos de Dapr no sufre. Mantenemos la magia matemática de los
Timersa largo plazo (días, meses) que nos regala la estructuraWorkflow as Code. - Desventajas: El flujo está fragmentado (una parte en mensajería reactiva y otra en el Workflow).
B) El Modelo "Dapr Actor Puro" (Patrón Buzón / Mailbox Completo)¶
Si decidiéramos que el Actor sea el amo absoluto y reciba directamente los 400 eventos desde RabbitMQ sin filtrado previo:
* Flujo: RabbitMQ -> Dapr Actor (vía method invocation).
* Comportamiento: El Dapr Actor encola los 400 requests en su buzón local y comienza a procesarlos secuencialmente, uno por uno. Como el Dapr Actor no es Event Sourced (a diferencia del Workflow), simplemente actualiza su estado en memoria (stateManager.set()). Al final de cada iteración del bucle, decidirá si salva a disco (stateManager.save()). Si sólo salva cada cierto tiempo o al quedarse sin mensajes, el I/O se mantiene muy bajo.
* Ventajas: Código 100% cohesionado en un solo lugar (El Actor).
* Desventajas: Para lanzar una acción dentro de 24 horas, un Dapr Actor te obliga a usar Reminders explícitos (actor.registerReminder(...)), crear métodos callback (receiveReminder), e inyectar lógica de retries si la red falla en el segundo 24h. Es un regreso al boilerplate estilo Pekko PersistentActor, perdiendo la linealidad narrativa del motor de Workflow.
Resolución: Se prioriza la legibilidad a largo plazo ("Workflow as Code") que aporta Dapr Workflows en combinación con el debouncing asíncrono de Mutiny, relegando la I/O masiva a la capa de memoria de Quarkus.
4. Conclusión Estratégica¶
Para el escenario de tu prueba de estrés (400 puestas en segundo), he aquí las principales conclusiones confirmadas por Quarkus Mutiny Caching y Dapr:
- El Flood Test rompería la DB de persistencia: Dapr usa State Stores, y emitir 400 señales por segundo para el mismo UUID a un mismo Workflow resultaría catastrófico a nivel base de datos (
too many connectionso bloqueo de bloqueos transaccionalesOptimisticConcurrencyException). - Mutiny como escudo Non-Blocking: Las colisiones de concurrencia a nivel computación pura no son problema. Quarkus y Mutiny no crean 400 hilos por cada 400 mensajes; todo se empaqueta en Event-Loops eficientemente. El cuello de botella es I/O contra Dapr.
- Siguiente Acción: Refactorizar el
EncounterEventConsumerusando transformaciones en streams de la Familia de Patrones de Absorción: ya sea a niveldebounce()o una cachécaffeinelocal TTL=1s que actúe de proxy antes de enviar el evento gRPC adapr-sidecar.