Last active 4 hours ago

Revision be92ce383040348e4b181d85837feb5a611c9091

consumer.py Raw
1from time import sleep
2import json
3from kafka import KafkaConsumer
4#
5# --- Configuration ---
6KAFKA_BROKER = ['localhost:29001', 'localhost:29002', 'localhost:29003']
7TOPIC_NAME = 'my-test-topic'
8CONSUMER_GROUP_ID = 'my-python-consumer-group2' # Important for parallelism and offset tracking
9
10def create_consumer():
11 """Creates and returns a KafkaConsumer instance."""
12 try:
13 consumer = KafkaConsumer(
14 TOPIC_NAME, # Topic(s) to subscribe to
15 bootstrap_servers=KAFKA_BROKER,
16 auto_offset_reset='earliest', # 'earliest' to read from the beginning, 'latest' for new messages only
17 group_id=CONSUMER_GROUP_ID, # Consumer group ID
18 # Deserialize messages from UTF-8 bytes and then parse JSON
19 value_deserializer=lambda v: json.loads(v.decode('utf-8')),
20 # Optional: consumer_timeout_ms=1000 # Stop iterating if no message after 1s
21 # Optional: enable_auto_commit=True # Default is True
22 # Optional: auto_commit_interval_ms=5000 # Default is 5s
23 )
24 print("Kafka Consumer created successfully. Subscribed to topic:", TOPIC_NAME)
25 return consumer
26 except Exception as e:
27 print(f"Error creating Kafka Consumer: {e}")
28 return None
29
30if __name__ == "__main__":
31 consumer = create_consumer()
32
33 if consumer:
34 print("Listening for messages... (Press Ctrl+C to stop)")
35 try:
36 for message in consumer:
37 # message object contains details like:
38 # message.topic, message.partition, message.offset,
39 # message.key, message.value
40
41 #print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
42 print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}")
43
44 # print(f"\nReceived message:")
45 # print(f" Topic: {message.topic}")
46 # print(f" Partition: {message.partition}")
47 # print(f" Offset: {message.offset}")
48 # # print(f" Key: {message.key}") # We didn't send a key in this example
49 # print(f" Value: {message.value}")
50 # print("Processing now....")
51 # #sleep(0.1)
52 # print("Processing complete! Move to next msg!")
53
54
55 # Optionally, you might want to commit offsets manually if auto_commit is off
56 # consumer.commit()
57
58 except KeyboardInterrupt:
59 print("\nConsumer interrupted by user.")
60 except Exception as e:
61 print(f"An error occurred during consumption: {e}")
62 finally:
63 if consumer:
64 consumer.close()
65 print("Kafka Consumer closed.")
producer.py Raw
1import time
2import json
3from kafka import KafkaProducer
4
5# --- Configuration ---
6KAFKA_BROKER = ['localhost:29001', 'localhost:29002', 'localhost:29003']
7TOPIC_NAME = 'my-test-topic'
8MESSAGES_TO_SEND = 100
9
10def create_producer():
11 """Creates and returns a KafkaProducer instance."""
12 try:
13 producer = KafkaProducer(
14 bootstrap_servers=KAFKA_BROKER,
15 # Serialize messages as JSON and then encode to UTF-8 bytes
16 value_serializer=lambda v: json.dumps(v).encode('utf-8'),
17 # Key serializer
18 key_serializer=str.encode,
19 # Optional: Add acks='all' for higher durability guarantees
20 acks='all',
21 # Optional: Add retries for transient errors
22 retries=5
23 )
24 print("Kafka Producer created successfully.")
25 return producer
26 except Exception as e:
27 print(f"Error creating Kafka Producer: {e}")
28 return None
29
30def send_message(producer, topic, message_data):
31 """Sends a single message to the specified topic."""
32 try:
33 # Generate random integer between 1000 and 2000
34 random_id = int(time.time() * 1000) % 1000 + 1000
35
36 # The key can be None, or a specific value (e.g., to ensure messages with the same key go to the same partition)
37 # For this basic example, we'll use None as the key.
38 key = None
39 key = f'client_{random_id}'
40 #key = '21300'
41 #key = f'client-21300'
42 future = producer.send(topic, key=key, value=message_data)
43 # Block for 'synchronous' sends; wait for ack from broker
44 # record_metadata = future.get(timeout=10) # seconds
45 # print(f"Message sent to topic '{record_metadata.topic}' partition {record_metadata.partition} offset {record_metadata.offset}")
46 print(f"Message sent: {message_data}")
47 except Exception as e:
48 print(f"Error sending message: {e}")
49
50if __name__ == "__main__":
51 producer = create_producer()
52
53 if producer:
54 try:
55 for i in range(MESSAGES_TO_SEND):
56 message = {
57 'id': i,
58 'text': f'Hello from Python Kafka Producer! Message #{i}',
59 'timestamp': time.time()
60 }
61 send_message(producer, TOPIC_NAME, message)
62 #time.sleep(0.1) # Wait a bit between messages
63
64 # Ensure all buffered messages are sent before exiting
65 producer.flush()
66 #time.sleep(1)
67 print(f"All {MESSAGES_TO_SEND} messages sent and flushed.")
68 #time. sleep(1)
69
70 except KeyboardInterrupt:
71 print("Producer interrupted by user.")
72 finally:
73 if producer:
74 producer.close()
75 print("Kafka Producer closed.")
requirements.txt Raw
1kafka-python==2.2.10
2murmurhash2==0.2.10