Usando o padrão observer com CDI para lidar com o Kafka

Usando o padrão observer com CDI para lidar com o Kafka
lacerdaph
lacerdaph

Compartilhe

Há pouco tempo passei por um problema que é rotina na vida de todo desenvolvedor: acoplamento. Identificamos que um sistema está acoplado quando é difícil mudar a implementação da solução que foi escolhida para determinado problema, quando uma classe depende de várias outras para funcionar, quando há muito o uso do design patterns RCP, dentre outros.

No meu caso, o problema era retirar mensagens de um servidor de mensageria. Teria que fazer da forma menos acoplada possível, pois a probabilidade de trocar esse servidor é grande. Não deveria nem ter relacionamento com a especificação padrão JMS, haja vista que escolhemos o Kafka como servidor e ele, por uma série de motivos, incluindo desempenho, não segue a especificação.

Talvez haja mais servidores de mensageria do que de aplicação JavaEE, então o analista tem muitos trade-offs para avaliar. De qualquer forma, mesmo que futuramente haja uma troca para um servidor JMS, as partes dos sistema que usam esse serviço não devem ser alteradas, aí que está o desafio do acoplamento.

The more coupling we have between objects, components, modules, or systems, the more we experience many consequences. These consequences include but are not limited to difficulty of modification, propagation of failure, inability to scale because of contention, and performance issues due to dependent actions.

Encapsulando a API.

A API do Kafka é bem tranquila de usar, baseada em Producer e Consumer, portanto o primeiro desafio era encapsular o uso em uma interface mais simples ainda. Outro problema aparece, o encapsulamento. Nenhuma das partes do sistema devem saber sobre os detalhes de como foi implementado o serviço de mensageria, devem apenas saber o mínimo possível para utilizá-lo.

 public interface ServicoMensageria { public void assinar(String topico);

public void enviarMensagem(String topico, String mensagem); } 

Da interface acima, poderíamos diminuir ainda mais, como omitir método assinar, identificando que o contexto da aplicação foi inicializado e fazendo a assinatura automática.

Agora basta usar um framework de injeção de dependências para usar o serviço de mensageria, sem saber quem é o responsável por prover a implementação.

 @Inject private ServicoMensageria servicoMensageria;

public void envia(){ servicoMensageria.enviarMensagem("meutopico","mensagem") } 

O padrão Observer

Agora passamos para o próximo problema, o cliente da API já fez a assinatura de um fila de mensagens e precisa ser notificado quando houver alguma mensagem na fila. Para isso, nada mais adequado do que implementar o padrão Observer. Ao invés de implementar o padrão, usei uma api que já implementa, inclusive discutida pelo Sérgio Lopes anteriormente, o CDI.

 

 public class OuvidorMensageriaNFe {

/\*\* \* \* SerieMensagens é uma wrapper que encapsula uma List <Mensagem>; \*\*/ public void ouvindoMensagens(@Observes SerieMensagens serieMensagens) {

System.out.println("mensagem recebida "+ serieMensagens); } } 

Lidando com o sincronismo.

Agora precisamos retirar mensagens da fila e produzir eventos do CDI. Só que aqui temos um sério problema: a API de Consumer do Kafka fica em um loop infinito while true e caso haja mensagem um evento deve ser disparado.

 try (KafkaConsumer<String, String> consumer = new KafkaConsumer<> (consumerProperties);) {

consumer.subscribe("meutopico");

while (true) {

List < Mensagem > mensagens = new ArrayList <> (); ConsumerRecords < String, String > records = consumer.poll(pollPadrao);

for (ConsumerRecord < String, String > record : records) {

mensagens.add(new Mensagem(record.topic(), record.value())); } if (!mensagens.isEmpty()) {

//cuidado redobrado aqui evento.fire(new SerieMensagens(mensagens));

mensagens = new ArrayList<>(); }

} } 

O problema aqui é que essa solução não escala e será muito lenta, devido a basicamente um limitador: o CDI só trabalha com eventos síncronos, ou seja, quando a Thread chega na linha evento.fire(), ela só passa para o próximo passo quando todos os responsáveis por ouvir o evento @Observer SerieMensagens terminam o seu processamento. Há como contornar esse "problema", tanto no produtor do evento como no observador do evento. Neste, poderíamos utilizar o Asynchonous e naquele bastaria lançar o evento em uma thread separada.

 


if (!mensagens.isEmpty()) {

final List < Mensagem > mensagensParaEnviar = new ArrayList <> (mensagens); new Thread(() -> { evento.fire(new SerieMensagens(mensagensParaEnviar));}).start();

mensagens = new ArrayList <> (); }

Concluindo.

Pensar em baixo acoplamento, alta coesão e encapsulamento é problema diário de qualquer desenvolvedor. Você pode até não perceber de imediato, mas o preço da manutenibilidade chega logo (Design payoff). Por fim, há várias outras features do CDI que você deveria ter no seu leque de opções e talvez no futuro, a invocação de métodos assíncronos faça parte da especificação. Se bem que o JavaEE 8 não está tão perto assim de sair, no momento passa por uma crise interna.

Veja outros artigos sobre Programação