from time import sleep import json from kafka import KafkaConsumer # --- Configuration --- KAFKA_BROKER = ['localhost:29001', 'localhost:29002', 'localhost:29003'] TOPIC_NAME = 'my-test-topic' CONSUMER_GROUP_ID = 'my-python-consumer-group2' # Important for parallelism and offset tracking def create_consumer(): """Creates and returns a KafkaConsumer instance.""" try: consumer = KafkaConsumer( TOPIC_NAME, # Topic(s) to subscribe to bootstrap_servers=KAFKA_BROKER, auto_offset_reset='earliest', # 'earliest' to read from the beginning, 'latest' for new messages only group_id=CONSUMER_GROUP_ID, # Consumer group ID # Deserialize messages from UTF-8 bytes and then parse JSON value_deserializer=lambda v: json.loads(v.decode('utf-8')), # Optional: consumer_timeout_ms=1000 # Stop iterating if no message after 1s # Optional: enable_auto_commit=True # Default is True # Optional: auto_commit_interval_ms=5000 # Default is 5s ) print("Kafka Consumer created successfully. Subscribed to topic:", TOPIC_NAME) return consumer except Exception as e: print(f"Error creating Kafka Consumer: {e}") return None if __name__ == "__main__": consumer = create_consumer() if consumer: print("Listening for messages... (Press Ctrl+C to stop)") try: for message in consumer: # message object contains details like: # message.topic, message.partition, message.offset, # message.key, message.value #print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}") print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}") # print(f"\nReceived message:") # print(f" Topic: {message.topic}") # print(f" Partition: {message.partition}") # print(f" Offset: {message.offset}") # # print(f" Key: {message.key}") # We didn't send a key in this example # print(f" Value: {message.value}") # print("Processing now....") # #sleep(0.1) # print("Processing complete! Move to next msg!") # Optionally, you might want to commit offsets manually if auto_commit is off # consumer.commit() except KeyboardInterrupt: print("\nConsumer interrupted by user.") except Exception as e: print(f"An error occurred during consumption: {e}") finally: if consumer: consumer.close() print("Kafka Consumer closed.")