Apache Kafka es una plataforma de transmisión de eventos distribuidos de código abierto utilizada por miles de empresas para canalizaciones de datos de alto rendimiento, análisis de transmisión, integración de datos y aplicaciones de misión crítica.
Permite no solo publicar, almacenar y procesar flujos de eventos de forma inmediata, sino también suscribirse a ellos. Está diseñada para administrar los flujos de datos de varias fuentes y enviarlos a distintos usuarios.
Es una alternativa a RabbitMQ o Apache ActiveMQ.
¿Para qué sirve?
- Procesamiento de datos en tiempo real: Kafka permite ingerir y procesar flujos continuos de datos, como logs, métricas, eventos de usuarios o transacciones.
- Mensajería asíncrona: Actúa como intermediario entre sistemas, permitiendo que productores envíen datos y consumidores los procesen sin necesidad de estar sincronizados.
- Integración de sistemas: Conecta aplicaciones y servicios heterogéneos, como bases de datos, sistemas de monitoreo o herramientas de análisis.
- Almacenamiento temporal de datos: Los datos se almacenan en tópicos (topics) durante un tiempo configurable, permitiendo a los consumidores leerlos cuando lo necesiten.
- Escalabilidad: Soporta grandes volúmenes de datos y puede escalar horizontalmente añadiendo más nodos al clúster.
- Tolerancia a fallos: Su arquitectura distribuida asegura que los datos estén replicados y disponibles incluso si algún nodo falla.
Componentes principales:
- Productores: Envían datos a tópicos.
- Consumidores: Leen datos de tópicos.
- Tópicos: Categorías donde se almacenan los datos, divididos en particiones para paralelismo.
- Brokers: Servidores que forman el clúster de Kafka y gestionan los datos.
- ZooKeeper: Gestiona la coordinación y configuración del clúster (aunque las versiones más recientes permiten operar sin ZooKeeper usando KRaft).
Casos de uso comunes
- Análisis en tiempo real: Procesar datos de IoT, como sensores en dispositivos o vehículos.
- Gestión de logs: Recopilar y procesar logs de aplicaciones o servidores (ej. usado con ELK Stack).
- Sistemas de recomendación: Enviar eventos de usuarios a motores de recomendación en plataformas como Netflix o Spotify.
- Pipelines de datos: Mover datos entre sistemas, como de una base de datos a un data lake o almacén de datos.
- Microservicios: Facilitar la comunicación entre microservicios en arquitecturas modernas.
Como vimos con RabbitMQ, se usa en aplicaciones tipo Productor(emisor) - Consumidor(receptor).
Nota curiosa: El nombre viene del escritor Franz Kafka, famoso por escribir La Metamorfosis. De nada.
Instalando Apache Kafka
Es necesario contar con el JDK del 11 al 17 de preferencia. Debemos descargar la última versión de Apache Kafka:
https://kafka.apache.org/downloadsTambién se puede hacer uso de una imagen de Docker. En este caso nosotros no la ocuparemos.
Instalación en Linux
Una vez descargado debemos descomprimirlo y ubicarnos en el directorio:
$ tar -xzf kafka_2.13-3.9.1.tgz $ cd kafka_2.13-3.9.1
Inicia ZooKeeper con el script proporcionado:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Esto inicia ZooKeeper en el puerto predeterminado 2181. Déjalo corriendo en una terminal.
En otra terminal, inicia el servidor de Kafka:
$ bin/kafka-server-start.sh config/server.properties
Esto inicia un broker en el puerto predeterminado 9092. Déjalo corriendo.
Instalación en Windows
Una vez descargado debemos descomprimirlo y ubicarnos en el directorio.
Inicia ZooKeeper con el script proporcionado:
$ bin\windows\zookeeper-server-start.bat config\zookeeper.properties
Esto inicia ZooKeeper en el puerto predeterminado 2181. Déjalo corriendo en una terminal.
En otra terminal, inicia el servidor de Kafka:
$ bin\windows\kafka-server-start.bat config\server.properties
Esto inicia un broker en el puerto predeterminado 9092. Déjalo corriendo.
Probando Apache Kafka
Crea un tópico llamado test-topic para probar:
En Linux:
$ bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
En Windows:
$ bin\windows\kafka-topics.bat --create --topic test-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
Ahora probamos. Asegúrate de que los puertos 2181 (ZooKeeper) y 9092 (Kafka) estén libres.
Linux
Productor:
$ bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
Escribe mensajes (ej. "Hola, Mundo") y presiona Enter.
Consumidor:
$ bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092 --from-beginning
Windows
Productor:
$ bin\windows\kafka-console-producer.bat --topic test-topic --bootstrap-server localhost:9092
Consumidor:
$ bin\windows\kafka-console-consumer.bat --topic test-topic --bootstrap-server localhost:9092 --from-beginning
Para salir presionamos la combinación de teclas Ctrl+C.
Apache Kafka & Python
Crearemos una aplicación productor y un consumidor. Para ello es necesario isntalar esta librería:
$ pip install confluent-kafka
Asegúrate de tener un servidor de Kafka corriendo (puedes usar Docker o una instalación local). Por ejemplo, un broker en localhost:9092. Crea un tópico en Kafka llamado test-topic (puedes usar la herramienta kafka-topics.sh o kafka-topics.bat).
Productor (productor.py) :
from confluent_kafka import Producer # Configuración del productor conf = { 'bootstrap.servers': 'localhost:9092', # Dirección del broker de Kafka } # Crear el productor producer = Producer(conf) # Callback para confirmar la entrega del mensaje def delivery_report(err, msg): if err is not None: print(f'Error al enviar mensaje: {err}') else: print(f'Mensaje enviado a {msg.topic()} [{msg.partition()}]') # Enviar el mensaje "Hola, Mundo" topic = 'test-topic' message = 'Hola, Mundo' producer.produce(topic, value=message, callback=delivery_report) # Esperar a que los mensajes se envíen producer.flush()
Consumidor (consumidor.py) :
from confluent_kafka import Consumer, KafkaError # Configuración del consumidor conf = { 'bootstrap.servers': 'localhost:9092', 'group.id': 'my-group', # Grupo de consumidores 'auto.offset.reset': 'earliest' # Leer desde el inicio del tópico } # Crear el consumidor consumer = Consumer(conf) # Suscribirse al tópico topic = 'test-topic' consumer.subscribe([topic]) # Leer mensajes try: while True: msg = consumer.poll(timeout=1.0) # Esperar 1 segundo por mensajes if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: print('Fin de la partición') else: print(f'Error: {msg.error()}') else: print(f'Mensaje recibido: {msg.value().decode("utf-8")}') break # Salir después de recibir el mensaje except KeyboardInterrupt: pass finally: consumer.close()
Ejecutamos el consumidor:
$ python consumidor.py
Ahora ejecutamos el productor:
$ python productor.py
Salida del productor:
Mensaje enviado a test-topic [0]
Salida del consumidor:
Mensaje recibido: Hola, Mundo
Apache Kafka es ideal para sistemas que necesitan manejar grandes flujos de datos en tiempo real, garantizando escalabilidad, confiabilidad y flexibilidad en la integración de datos.
Enlaces:
https://kafka.apache.org/https://activemq.apache.org/
Comentarios
Publicar un comentario