import time import json from kafka import KafkaProducer # --- Configuration --- KAFKA_BROKER = ['localhost:29001', 'localhost:29002', 'localhost:29003'] TOPIC_NAME = 'my-test-topic' MESSAGES_TO_SEND = 100 def create_producer(): """Creates and returns a KafkaProducer instance.""" try: producer = KafkaProducer( bootstrap_servers=KAFKA_BROKER, # Serialize messages as JSON and then encode to UTF-8 bytes value_serializer=lambda v: json.dumps(v).encode('utf-8'), # Key serializer key_serializer=str.encode, # Optional: Add acks='all' for higher durability guarantees acks='all', # Optional: Add retries for transient errors retries=5 ) print("Kafka Producer created successfully.") return producer except Exception as e: print(f"Error creating Kafka Producer: {e}") return None def send_message(producer, topic, message_data): """Sends a single message to the specified topic.""" try: # Generate random integer between 1000 and 2000 random_id = int(time.time() * 1000) % 1000 + 1000 # The key can be None, or a specific value (e.g., to ensure messages with the same key go to the same partition) # For this basic example, we'll use None as the key. key = None key = f'client_{random_id}' #key = '21300' #key = f'client-21300' future = producer.send(topic, key=key, value=message_data) # Block for 'synchronous' sends; wait for ack from broker # record_metadata = future.get(timeout=10) # seconds # print(f"Message sent to topic '{record_metadata.topic}' partition {record_metadata.partition} offset {record_metadata.offset}") print(f"Message sent: {message_data}") except Exception as e: print(f"Error sending message: {e}") if __name__ == "__main__": producer = create_producer() if producer: try: for i in range(MESSAGES_TO_SEND): message = { 'id': i, 'text': f'Hello from Python Kafka Producer! Message #{i}', 'timestamp': time.time() } send_message(producer, TOPIC_NAME, message) #time.sleep(0.1) # Wait a bit between messages # Ensure all buffered messages are sent before exiting producer.flush() #time.sleep(1) print(f"All {MESSAGES_TO_SEND} messages sent and flushed.") #time. sleep(1) except KeyboardInterrupt: print("Producer interrupted by user.") finally: if producer: producer.close() print("Kafka Producer closed.")