Primeiras aulas do curso Kafka: Fast delegate, evolução e cluster de brokers

Kafka: Fast delegate, evolução e cluster de brokers

Novos produtores e consumidores - Introdução

Bem-vindo a mais um concurso de Kafka, nesse curso vamos criar diversos serviços e extrair alguns serviços para trabalhar num ambiente onde a arquitetura distribuída - cada um desses serviços que podem rodar na mesma máquina ou em máquinas distintas; cada um deles pode rodar uma ou mais vezes em paralelo, permitindo que paralelizar muito do nosso trabalho.

Também vai ter questões em momentos em que se queira serializar o processamento e faremos isso. Veremos o ponto de falha do Kafka quando tem um único broker como que pode vencer isso através de um cluster de broker.

Não só os nossos serviços são rodados diversas vezes em um cluster, mas também os brokers do Kafka, para isso vai ser importante de entender como que é um tópico é armazenado dentro do Kafka, como que pode fazer a replicação desse tópico, baseado nas informações que um líder.

Um líder recebe as informações de escrita e replica isso para suas replicas que vão estar em sincronia então com o líder e isso faz com que quando líder cai, réplicas caem e você consiga ter mais garantias de que as informações estão armazenadas e serão em algum momento consumidas.

Isto é, garantimos mais reliability e obteremos mais realiability ao mesmo tempo de paralelização, quando quisermos serialização em sequência, uma atrás da outra, tudo isso conseguiremos ganhar ao mesmo tempo a medida que vamos configurando Kafka pensando nessa nossa arquitetura, seja na hora que recebe requisição http.

Seja na hora que escreve no banco de dados que vai implementar, ou seja, no miolo dos nossos serviços. Veremos tudo isso acontecendo nesse curso. Se você não fez o nosso primeiro curso de Kafka, sugiro fazer ele antes ou dar olhada se você tem conhecimento dele e baixar o conteúdo do código do Primeiro Curso que você verá nas atividades da primeira aula.

Novos produtores e consumidores - Produtores consumidores e o eager de patterns

Então vou começar, eu vou continuar o projeto do outro curso, se você já executou todo o outro curso tem o mesmo código que eu; se não você pode baixar do hub conforme os links da atividade.

Eu vou dar um open no projeto, vou escolher o diretório onde está o projeto, projeto atual, vou abri-lo e tenho-o carregado com o final do último curso. Esse projeto é um sistema de e-commerce que está simulando o fim da venda, o processo da venda.

A pessoa gerou uma venda. Então eu tenho um serviço que gera 10 vendas para testar e essas 10 vendas geram 20 mensagens 10 de e-mail e 10 de novas vendas, novos pedidos de venda. Esses pedidos de venda passam por um detector de fraude e o e-mail para service mail. Todas as mensagens são logados no service log.

Repara que tem serviços enviando e serviços escutando mensagens. Vamos agora misturar as duas coisas, mostrar que não tem desafio em misturar - enviar e receber mensagens no mesmo serviço.

Vamos pegar o serviço de detecção de fraudes que ele recebe a mensagem de um novo pedido, se ele recebe mensagem de um novo pedido no Java dele ele espera 5 segundos e a ordem foi processada - deu certo ou não deu não importa.

Eu queria simular uma situação onde em alguns casos esse nosso pedido foi aprovado e em outros não, teve uma fraude. Quando gera os pedidos de compra, o valor é um valor de um a cinco mil mais ou menos.

Você poder fazer assim, se valor for acima de quatro mil e quinhentos recusa. Na prática se você está usando uma inteligência artificial, algoritmo que for você vai deixar seu estimador nesse serviço, a equipe da machine learning implementa e deixa aqui.

Vou fazer uma regra simples para tentar identificar a fraude - a fraude é seu pedido é muito caro, é fraude não. Isso não é uma coisa do mundo real esse detector de fraude, que não é o foco desse curso. Temos outros cursos onde o foco é machine learning, aprendizado de máquinas e várias outras coisas.

Esperei esses 5 segundos simular esse algoritmo lerdo, e quero pegar essa order, acessá-la no record, o var order = record.value dela devolve uma ordem para mim. Agora eu posso fazer o que eu quiser com essa order. Será que o valor dessa order, order.get, get o que? O que eu tenho aqui.

Não tem nenhum get aqui. Nem para saber, se eu quiser acessar o valor não tem nesse modelo. Mas você gostaria de colocar o get do amount aqui, deixa acessível amount, não tem problema nenhum. Ou poderia implementar métodos, do do tipo verifica se é fraude, poderia fazer isso e esse modelo deixa de ser anêmico.

Poderia fazer isso se eu quisesse e não tem problema nenhuma. Então o que eu vou querer fazer é criar um getter aqui - tem várias maneiras de criar com IntelliJ, cada ferramenta que você usa linguagem tem propriedades e algo do gênero. No meu caso digitando get ele gera o getAmount.

Poderia até deixar como público já que o BigDecimal é imutável e ele é final não teria esse problema, mas não é considerada uma boa parte gero getter somente para que eu preciso. O meu get amount eu gostaria de comparar, gostaria de saber se ele é maior ou menor. Então queria saber se é greater ou lesser, mas não tem. No BigDecimal o que tem é compareTo.

Eu passo outro BigDecimal, vou passar com aspas que eu tenho a precisão de que seja “4500” exatamente. Se for maior ou igual a zero, quer dizer que o preço é muito alto é maior que 4500, pretending the fraud happen when the amount is >= 4500, maior ou igual a 4500.

Nesse caso é teve um fraude. Eu acho legal mesmo que a lógica seja super simples, extrair em algo. Um método, algo. No IntelliJ Refractor Extract Method, quero saber se é fraude isFraud.

Aproveitaria o método de fraude aqui no caso no meu serviço ou nas classes que fizerem sentido para o seu serviço. Se for uma Fraude que eu vou fazer? Em vez do colocar order processed, vou colocar order is a fraud.

Se não ela não é, quer dizer ela não é uma fraude, Sys out, System.out eu vou falar approved: + order, posso colocar aqui aprovado. Repara que agora a minha classe order é diferente da classe order que envia.

Não tem problema, porque o processo de serialização e desserialização só usa os campos. Lembra que eu falei importância no outro curso, de que o processo de serialização e desserialização pode ser feita de diversas maneiras, da maneira que estou fazendo agora se um serviço precisa de certos métodos e outro de outro não tem problema nenhum.

Não precisa ficar colocando dependências nos dois, misturando e “sujando” um projeto com coisa do outro projeto. Eles são isolados, tem um get que é um método simples, ainda continua anêmico etc., mas poderia colocar outros métodos.

Vamos ver isso acontecer de verdade? Vou rodar o meu fraud detector, run fraud detector, vou agora no meu New Order Main, vou tentar rodar também, “Ctrl + Shift + R”, o diretório que estava errado. Vou deixar rodar.

Vou rodar primeiro meu fraud detector “Ctrl + Shift + R”, ele diz que tem erro no módulo que eu não estou especificando, vamos procurar aqui o módulo. E aí eu vou escolher aqui o módulo do fraud detector, legal. Está rodando fraud detector e New Order Main a mesma coisa vou rodar ele deve reclamar também do módulo.

Vou mandar 10 mensagem de e-mail, 10 mensagens de New Order e vemos as new orders chegando e checando pela fraude. Checou pela fraude aprovou essa, checou aprovou e ele vai aprovando ou recusando de acordo com o valor, ele vai imprimindo.

É uma fraude e quando é sucesso ele fala aprovado, só que ele não está falando dados de aprovado ou os dados do que foi uma fraude, eu queria imprimir aqui, quando imprimo realmente a ordem, eu queria substituir também o to string, poder mostrar a order por completo então no caso do Java pode gerar to string muito fácil aqui.

Cada linguagem, cada ferramenta vai ter uma maneira - é só um “Ctrl + N” to string ele gera com os três valores, nosso FraudDetectorService posso até pará-lo no meio do que ele está rodando, vou rodar de novo, talvez ainda tem alguma mensagem para ser processada, vai processar, caso contrário gera mais mensagem.

Assim que ele levantar, está levantando, parece que não tem mensagem nova. Então vou mandar mensagem nova, as 10 mensagens e vai começar a ver agora as orders corretas. Então essa daqui que o amount é 2716 tem ser aprovada e depois tem outra 2500 aprovada. E por aí vai, cada uma delas vai sendo aprovada ou recusada.

O que eu queria era que o meu serviço além de ter tem essa vontade de evoluir independentemente do outro serviço, eu posso agora enviar mensagens também. Além de eu receber mensagens, eu gostaria de enviar mensagem. Se eu quero enviar mensagem preciso de um Kafka Dispatcher.

Como estamos dentro do FraudDetectorService vou colocar private final KafkaDispatcher que ele vai despachar se é o pedido que foi aceito eu poderia despachar o pedido, a minha order. Vou despachar a minha order, uso KafkaDispatcher.

OrderDispatcher é meu KafkaDispatcher do tipo order, como já está definindo aqui o tipo, não precisa do tipo aqui e temos um dispatcher agora. Com order dispatcher eu posso despachar mensagens, se foi, por exemplo, recusado OrderDispatcher.send, tópico e-commerce, minha order fraudulenta, então foi rejeitada, reject. Poderia ser fraude, o que fosse, a chave que estou ID do usuário get, não tem o ID do usuário, user ID, e qual que é o objeto que vai enviar? A própria order.

Faltou criar o get user ID, vamos lá no nosso order, o getUserId, criei, ele está aqui. Tanto no envio do ecommerce rejected e do ecommerce order approved, nesse caso foi aprovada a minha order. [11:30] Ele está reclamando das exceptions, eu tenho que jogar as exceptions que podem ocorrer, adiciono as exceptions na minha função. Dá um erro, porque a função joga exception, a nossa função de passa joga exception. O KafkaService recebe um consumer function que não pode jogar exception.

tem que tomar cuidado. vamos ter de adicionar as exceptions aqui. Vou adicionar o mínimo, prefiro ir sempre pelo caminho do mínimo, vai adicionando na medida do necessário. [12:09] “Ctrl + enter” para adicionar na assinatura, então agora no ConsumerFunction pode dar ExecutionException ou InterruptedException, vamos saber o que fazer. Dá uma olhada no KafkaService, quando chama consume quer tratar esse erro. Se deu esse erro, preciso tratar.

De alguma maneira tratar. Ou eu jogo exception ou eu paro meu serviço completamente, o caso da exception ou eu trato a exception para essa mensagem e a próxima eu continuo trabalhando.

Uma maneira de lidar, é uma das maneiras de lidar. tem várias opções para fazer o tratamento, por enquanto você só vai logar. então, so far, Just logging the exception for this message, em algum lugar armazenar as mensagens que deram erro. Poderia não comentar essa mensagem e deixa tentar de novo, enquanto está dando exception, tem várias maneiras de lidar com esse erro e focar na hora de falar apenas de tratamento de erro.

Por enquanto que eu queria era ser capaz de um serviço que recebe, também enviar mensagens e é isso que eu estou fazendo. Mas o que falta? Rodar o log, vou restartar FraudDetectorService, vou abrir meu LogService e rodar para ver as mensagens todas, ele reclama do módulo, service log e FraudDetectorService rodando e enviamos as 10 mensagens.

Quando envia, envia 10 e-mail, 10 da compra, do pedido de compra que chega aqui. Esses 10 do pedido de compra vão virar 10 novas - seja de aprovação ou de rejeição. Então 30 mensagens enviadas de um lado para o outro, rodando NewOrderMain, todas elas chegam aqui do send email, todas do FraudDetector chegam no log, daqui a pouquinho FraudDetector começa a rodar começa a rodar e fala enviei.

E o LogService não está recebendo, um cuidado muito importante quando está trabalhando com o LogService com pattern; quais são os subjects que ele está escutando não é dinâmico, não é que enquanto ele está escutando se surgiu subject novo ele escuta, não.

Ele começa a escutar o subject na hora que você roda ele, os subjects que têm, que servem esse padrão são subjects saber se ele vai escutar. Se surgir um novo subject que segue esse padrão, não vai escutar e surgiu um novo subject que não existia antes, dois novos surgiram e foram enviados.

Isto é, ele não tava escutando. Mas agora como esses tópicos já existem podemos rodar de novo o LogService e rodar o NewOrderMain.

Agora sim LogService vai pegar todas as mensagens dos tópicos que já existem, que são aqueles dois de ecommerce send mail e outro e do e-comerce approved e do e-commerce rejected (esse tem de esperar ocorrer aqui, de vez em quando acontece); fizemos um consumidor que também é produtor.

Novos produtores e consumidores - Um serviço que acessa bancos externos

Nosso próximo passo é criar um novo serviço. Eu queria mostrar agora essa questão da independência dos projetos, tem uma certa dependência de acordo com a estrutura do esquema das mensagens que são enviadas aquele JSON enviado de um lado para o outro.

Porém as nossas faces, nossas dependências internas ao serviço são independentes. Vamos observar serviço novo, que vai utilizar algo a mais: um banco de dados. Um outro serviço externo, eu quero criar um novo serviço que toda vez que vem uma mensagem de um pedido de compra novo, se o usuário é uma pessoa nova, tem um e-mail novo eu vou inserir esse usuário no banco.

A maneira de fazer isso é criar um serviço que representa o banco de usuários, onde teria as informações pessoais dos usuários e eu não gostaria que todos serviços acessassem.

Eu vou criar um novo módulo, que você já conhece. Esse módulo eu vou chamar de service-users, que é onde estão os usuários, criei e tenho o meu projeto. Se eu vou utilizar uma quantidade eu preciso de banco de dados e o que vamos utilizar se chama mvnrepository sqlite.

Quero a versão sqlite para o Java, então vou usar versão 3.28.0, vou dar um copy, vou adicionar a dependência no meu service users. Salvou, direitinho e não errado, faltou colocar isso dentro do dependences. Salvou, quero reload do arquivo e como eu já estou com auto reload, maravilha.

Senão venho no Maven, escolho projeto e dou um refresh, baixou a dependência. Posso ir no service users e dentro dele criar uma classe que vai se chamar na br.com.alura.ecommerce, um create user service, um serviço que cria usuários.

Então ele é um cliente user service um serviço que cria usuários, então ele é CreateUserService assim como fraude detector service, ele escuta a mensagem de nova ordem de compra. Eu vou pegar tudo aqui e vou dar um copy paste “feioso” dentro do meu cliente userservice.

Observa primeiro quando vai criar create user service quer usar o KafkaService, mas faltou importar, adicionar dependência do common-kafka, quero usar aqui dentro.

Sem o compile, quero usar para valer, na hora de executar para valer. A classe do nosso serviço é o CreateUserService, aqui é o consumer group. Ele vai consumir uma order.

Order.class faltou a classe order, eu vou copiar a order de envio, porque é a é mais simples de todas para mim, ela que eu preciso que não tem nada; depois se eu precisar de mais coisa cria o que precisar. Dei copy e paste, assim o order não cria dependência, quero simplesmente ter a minha própria cópia de um pedido.

Com os campos, com o que eu quiser. Vou precisar despachar, não! Só vou criar isso no meu banco, processing new order, checking for fraud new user, verificando se é um novo usuário; eu vou imprimir aqui o valor e mais nada.

Está suficiente, não preciso de um Thread.sleep, posso sair executando rapidinho, tenho a order e agora faz alguma coisa com essa order. Apenas corrige os imports, temos o nosso código do CreateUserService, só preciso acessar o banco e fazer alguma coisa.

Agora é a hora em que usamos na nossa biblioteca o acesso ao banco, eu quero criar agora vocês é um serviço que utiliza banco de dados, utiliza um serviço externo – poderia ser enviar e-mail, enviar push notifications, salvar arquivo em disco, seja lá o que for faz algum serviço externo. No meu caso banco de dados externo.

Vou que eu fazer vou criar uma conexão com o banco, vou assumir que esse serviço apenas roda uma única vez e vai implementar dessa maneira Tudo bem então de uma maneira. Os outros serviços dá para rodar quantas vezes quiser. No meu caso com banco de dados se você tivesse rodando outras linguagens remoto, poderia rodar quantas vezes quiser, deixar vários paralelos.

No meu caso eu vou rodar uma sqlite que vai salvar um arquivo em disco que eu só vou ter uma instância rodando, no nosso CreateUserService construtor eu vou querer abrir a conexão com ele, então String url – uma url de conexão com o banco, jdbc:sqlite:users_database.db (ele cria esse arquivo users_database.db).

Vou criar a conexão this.conection =DriverManager.getConnection(url). Você poderia usar ferramenta de outra linguagem, tudo bem, grava no banco. Estou usando jdbc porque quero ir direto ao ponto.

O foco não é o banco de dados avançado, nossa questão é serviço do consumidor e serviço que acessa serviços externos e etc. GetConnection, maravilha, tudo certinho, pode jogar exception.

Quero criar tabela, para isso connection.creatStatement de uma maneira mais simples, como não vai ter concatenação de string e executo-o. Tenho no statement create table Users que vai ter o primeiro campo uuid varchar no campo de texto, vou colocar até 200 caracteres, poderia colocar fixo etc. que é uma chave primária, primary key.

Tenho também o campo que é o e-mail da pessoa vou verificar se o e-mail já existe. Que também vai ser um varchar e eu também vou forçar com 200, esse meu create que eu gostaria de executar; tem de tomar cuidado porque esse create se a tabela já existe eu quero ignorar, a primeira ele cria a segunda tem de tomar cuidado.

Vou deixar dessa maneira primeiro, a segunda vez que rodar vemos o problema acontecer. Vou jogar a exception que tem aqui, add exception to method signature, maravilha. Se eu rodar esse serviço ele deveria ficar escutando e criar essa tabela.

Essas duas coisas ele deveria fazer, vamos rodar, “Ctrl + Shift + R”, está rodando, quando roda o CreateUserService ele está rodando no diretório projeto-atual/ecommerce. Se eu der Synchronize ecommerce vamos ver o arquivo users_database.db.

Ele criou, caso contrário teria dado exception, teria parado, no lugar de criar nesse diretório, vou criar dentro do diretório target, para ficar melhor. Vou dar delete em users_database.db e vou dar “Ctrl + Shift + R”, para rodar de novo.

Vou rodar novamente, vou dar o Synchronize ecommerce, sincronizou os diretórios, se olharmos o target veremos o users_database.db, criou e criou a nossa tabela porque não deu erro. Quando recebo uma nova mensagem, nova compra, novo order, eu quero verificar se já existe esse usuário, com esse e-mail.

Então, if(exists(order.getEmail) se já existe não faço nada, mas eu quero fazer se for um ususário novo, if(isNewUser(oreder.getEmail)), se for um usuário novo com esse e-mail aconteceu algo. Get email não existe, vamos criá-lo. Vai devolver uma string. Por enquanto não tem o e-mail.

Vou ver qualquer coisa, vai ter de receber e-mail o processo de compra, é um novo usuário vou criar nova função, por padrão vou retornar que é novo usuário. Se é um novo usuário quero inserir, então, insertNewUser(order.getEmail), quero implementar as duas funções, a primeira fazer um insert então, connection,prepareStatement, o statement que quero preparar é insert into users.

Campos do user: uuid, email, são os dois campos do usuário que temos. Faltaram os valores, values (?,?), esse é o meu statement, que eu preparei; tenho que jogar uma exception, porque pode dar erro, isso devolve para mim um statement de insert.

Prefiro chamar de insert, seta um string, meu uuid e o segundo insert.setString email, falo insert executa para mim, executou usuário adicionado, sout, Ususário uuid e + email adicionado. Operação simples eu prefiro ir por esse caminho.

Aqui e-mail. Inseri esse código, faltou uuid, vou jogar vou jogar exception e passa a ter um erro em cima, porque o KafkaService recebe ConsumerFunction - que não joga sqlException.

Na prática é raro colocar throws exception, apenas nos momentos em que quer tratar qualquer tipo de exception em qualquer linguagem; esse é o momento em que eu quero. Eu quero que quando eu recebo uma mensagem no meu KafkaService independente da mensagem quero ser capaz de recuperar e ir para outra mensagem.

Então, only catches Exception because no matter which Exception I want to recover and pase the next one, quero pegar a próxima. Todas as outras exceptions, maravilha. O problema que isNewUser, faltou comentar, verificar se é um usuário novo, aqui também quer pegar conexão connection.prepareStatement que é select vou buscar ID que é from Users where o email = ?, estou interessado em trazer um. Limite um.

Esse é o meu correto. É o meu query se existe, exists.setString a primeira é o e-mail que estou procurando, exists.execiteQuery, tenho que jogar exception do tipo SQL e devolve resultados.

Existe se tem próxima linha. Então, results.next se vai para próxima linha é porque existe, não é um usuário novo no banco. Verifico se é novo e insiro o usuário no banco. Isso quer dizer então que quando tem uma nova compra, envio a mensagem com a compra, a pessoa preencheu o site o aplicativo, os dados, o e-mail, dados da compra e enviou, ela gera uuid?

Nem sempre. Ela tem identificador único dela que é o e-mail, mas uuid não acontece na hora da compra, na hora de preencher o formulário, você pode fazer isso em algumas situações; mas no nosso caso, quando faz uma compra não tem uuid apenas tem o e-mail.

Então agora eu tenho um problema que tem que mudar todo o esquema de comunicação de um lado para o outro, porque na verdade a order do New Order, ela tem 10 userId, orderId e amount, não! Ela tem email, orderId e amount.

Isso vai para o nosso CreateUserService, quando o create cria o usuário no banco ou busca o usuário do banco aí se sabe ID dele – se existe ou não existe. Só vai existir uuid do usuário depois do CreateUserService se usuário é um usuário novo.

Esse é um cuidado que eu tenho que tomar, só faz sentido rodar o sistema de fraude e todas as outras coisas depois de ter colocado um usuário no meu banco, é uma decisão que a gente tem que tomar, faz sentido rodar depois ou antes com as informações na mensagem o que faz sentido? Isso é uma decisão que você tem que tomar.

De acordo com a decisão que toma no nosso sistema, as mensagens estarão fazendo um caminho ou outro e é isso que faremos daqui a pouco; adaptar aos nossos esquemas para isso, mas por enquanto já tem aqui um serviço capaz de armazenar dados e buscar dados de um banco e poderia usar qualquer biblioteca de banco.

Sobre o curso Kafka: Fast delegate, evolução e cluster de brokers

O curso Kafka: Fast delegate, evolução e cluster de brokers possui 139 minutos de vídeos, em um total de 24 atividades. Gostou? Conheça nossos outros cursos de Mensageria/Streams em DevOps, ou leia nossos artigos de DevOps.

Matricule-se e comece a estudar com a gente hoje! Conheça outros tópicos abordados durante o curso:

Aprenda Mensageria/Streams acessando integralmente esse e outros cursos, comece hoje!

Plus

  • Acesso a TODOS os cursos da plataforma

    Mais de 1200 cursos completamente atualizados, com novos lançamentos todas as semanas, em Programação, Front-end, UX & Design, Data Science, Mobile, DevOps e Inovação & Gestão.

  • Alura Challenges

    Desafios temáticos para você turbinar seu portfólio. Você aprende na prática, com exercícios e projetos que simulam o dia a dia profissional.

  • Alura Cases

    Webséries exclusivas com discussões avançadas sobre arquitetura de sistemas com profissionais de grandes corporações e startups.

  • Certificado

    Emitimos certificados para atestar que você finalizou nossos cursos e formações.

  • Alura Língua (incluindo curso Inglês para Devs)

    Estude a língua inglesa com um curso 100% focado em tecnologia e expanda seus horizontes profissionais.

12X
R$85
à vista R$1.020
Matricule-se

Pro

  • Acesso a TODOS os cursos da plataforma

    Mais de 1200 cursos completamente atualizados, com novos lançamentos todas as semanas, em Programação, Front-end, UX & Design, Data Science, Mobile, DevOps e Inovação & Gestão.

  • Alura Challenges

    Desafios temáticos para você turbinar seu portfólio. Você aprende na prática, com exercícios e projetos que simulam o dia a dia profissional.

  • Alura Cases

    Webséries exclusivas com discussões avançadas sobre arquitetura de sistemas com profissionais de grandes corporações e startups.

  • Certificado

    Emitimos certificados para atestar que você finalizou nossos cursos e formações.

  • Alura Língua (incluindo curso Inglês para Devs)

    Estude a língua inglesa com um curso 100% focado em tecnologia e expanda seus horizontes profissionais.

12X
R$120
à vista R$1.440
Matricule-se
Conheça os Planos para Empresas

Acesso completo
durante 1 ano

Estude 24h/dia
onde e quando quiser

Novos cursos
todas as semanas