Last active 4 hours ago

Revision 26db09f15b0e1595ed16bc5188fa3925db60bbfc

producer.py Raw
1import time
2import json
3from kafka import KafkaProducer
4
5# --- Configuration ---
6KAFKA_BROKER = ['localhost:29001', 'localhost:29002', 'localhost:29003']
7TOPIC_NAME = 'my-test-topic'
8MESSAGES_TO_SEND = 100
9
10def create_producer():
11 """Creates and returns a KafkaProducer instance."""
12 try:
13 producer = KafkaProducer(
14 bootstrap_servers=KAFKA_BROKER,
15 # Serialize messages as JSON and then encode to UTF-8 bytes
16 value_serializer=lambda v: json.dumps(v).encode('utf-8'),
17 # Key serializer
18 key_serializer=str.encode,
19 # Optional: Add acks='all' for higher durability guarantees
20 acks='all',
21 # Optional: Add retries for transient errors
22 retries=5
23 )
24 print("Kafka Producer created successfully.")
25 return producer
26 except Exception as e:
27 print(f"Error creating Kafka Producer: {e}")
28 return None
29
30def send_message(producer, topic, message_data):
31 """Sends a single message to the specified topic."""
32 try:
33 # Generate random integer between 1000 and 2000
34 random_id = int(time.time() * 1000) % 1000 + 1000
35
36 # The key can be None, or a specific value (e.g., to ensure messages with the same key go to the same partition)
37 # For this basic example, we'll use None as the key.
38 key = None
39 key = f'client_{random_id}'
40 #key = '21300'
41 #key = f'client-21300'
42 future = producer.send(topic, key=key, value=message_data)
43 # Block for 'synchronous' sends; wait for ack from broker
44 # record_metadata = future.get(timeout=10) # seconds
45 # print(f"Message sent to topic '{record_metadata.topic}' partition {record_metadata.partition} offset {record_metadata.offset}")
46 print(f"Message sent: {message_data}")
47 except Exception as e:
48 print(f"Error sending message: {e}")
49
50if __name__ == "__main__":
51 producer = create_producer()
52
53 if producer:
54 try:
55 for i in range(MESSAGES_TO_SEND):
56 message = {
57 'id': i,
58 'text': f'Hello from Python Kafka Producer! Message #{i}',
59 'timestamp': time.time()
60 }
61 send_message(producer, TOPIC_NAME, message)
62 #time.sleep(0.1) # Wait a bit between messages
63
64 # Ensure all buffered messages are sent before exiting
65 producer.flush()
66 #time.sleep(1)
67 print(f"All {MESSAGES_TO_SEND} messages sent and flushed.")
68 #time. sleep(1)
69
70 except KeyboardInterrupt:
71 print("Producer interrupted by user.")
72 finally:
73 if producer:
74 producer.close()
75 print("Kafka Producer closed.")