Last active 2 hours ago

mreschke's Avatar mreschke revised this gist 2 hours ago. Go to revision

No changes

mreschke's Avatar mreschke revised this gist 3 hours ago. Go to revision

1 file changed, 1 insertion, 1 deletion

consumer.py

@@ -1,7 +1,7 @@
1 1 from time import sleep
2 2 import json
3 3 from kafka import KafkaConsumer
4 -
4 + #
5 5 # --- Configuration ---
6 6 KAFKA_BROKER = ['localhost:29001', 'localhost:29002', 'localhost:29003']
7 7 TOPIC_NAME = 'my-test-topic'

mreschke's Avatar mreschke revised this gist 3 hours ago. Go to revision

2 files changed, 67 insertions

consumer.py(file created)

@@ -0,0 +1,65 @@
1 + from time import sleep
2 + import json
3 + from kafka import KafkaConsumer
4 +
5 + # --- Configuration ---
6 + KAFKA_BROKER = ['localhost:29001', 'localhost:29002', 'localhost:29003']
7 + TOPIC_NAME = 'my-test-topic'
8 + CONSUMER_GROUP_ID = 'my-python-consumer-group2' # Important for parallelism and offset tracking
9 +
10 + def create_consumer():
11 + """Creates and returns a KafkaConsumer instance."""
12 + try:
13 + consumer = KafkaConsumer(
14 + TOPIC_NAME, # Topic(s) to subscribe to
15 + bootstrap_servers=KAFKA_BROKER,
16 + auto_offset_reset='earliest', # 'earliest' to read from the beginning, 'latest' for new messages only
17 + group_id=CONSUMER_GROUP_ID, # Consumer group ID
18 + # Deserialize messages from UTF-8 bytes and then parse JSON
19 + value_deserializer=lambda v: json.loads(v.decode('utf-8')),
20 + # Optional: consumer_timeout_ms=1000 # Stop iterating if no message after 1s
21 + # Optional: enable_auto_commit=True # Default is True
22 + # Optional: auto_commit_interval_ms=5000 # Default is 5s
23 + )
24 + print("Kafka Consumer created successfully. Subscribed to topic:", TOPIC_NAME)
25 + return consumer
26 + except Exception as e:
27 + print(f"Error creating Kafka Consumer: {e}")
28 + return None
29 +
30 + if __name__ == "__main__":
31 + consumer = create_consumer()
32 +
33 + if consumer:
34 + print("Listening for messages... (Press Ctrl+C to stop)")
35 + try:
36 + for message in consumer:
37 + # message object contains details like:
38 + # message.topic, message.partition, message.offset,
39 + # message.key, message.value
40 +
41 + #print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
42 + print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}")
43 +
44 + # print(f"\nReceived message:")
45 + # print(f" Topic: {message.topic}")
46 + # print(f" Partition: {message.partition}")
47 + # print(f" Offset: {message.offset}")
48 + # # print(f" Key: {message.key}") # We didn't send a key in this example
49 + # print(f" Value: {message.value}")
50 + # print("Processing now....")
51 + # #sleep(0.1)
52 + # print("Processing complete! Move to next msg!")
53 +
54 +
55 + # Optionally, you might want to commit offsets manually if auto_commit is off
56 + # consumer.commit()
57 +
58 + except KeyboardInterrupt:
59 + print("\nConsumer interrupted by user.")
60 + except Exception as e:
61 + print(f"An error occurred during consumption: {e}")
62 + finally:
63 + if consumer:
64 + consumer.close()
65 + print("Kafka Consumer closed.")

requirements.txt(file created)

@@ -0,0 +1,2 @@
1 + kafka-python==2.2.10
2 + murmurhash2==0.2.10

mreschke's Avatar mreschke revised this gist 3 hours ago. Go to revision

1 file changed, 75 insertions

producer.py(file created)

@@ -0,0 +1,75 @@
1 + import time
2 + import json
3 + from kafka import KafkaProducer
4 +
5 + # --- Configuration ---
6 + KAFKA_BROKER = ['localhost:29001', 'localhost:29002', 'localhost:29003']
7 + TOPIC_NAME = 'my-test-topic'
8 + MESSAGES_TO_SEND = 100
9 +
10 + def create_producer():
11 + """Creates and returns a KafkaProducer instance."""
12 + try:
13 + producer = KafkaProducer(
14 + bootstrap_servers=KAFKA_BROKER,
15 + # Serialize messages as JSON and then encode to UTF-8 bytes
16 + value_serializer=lambda v: json.dumps(v).encode('utf-8'),
17 + # Key serializer
18 + key_serializer=str.encode,
19 + # Optional: Add acks='all' for higher durability guarantees
20 + acks='all',
21 + # Optional: Add retries for transient errors
22 + retries=5
23 + )
24 + print("Kafka Producer created successfully.")
25 + return producer
26 + except Exception as e:
27 + print(f"Error creating Kafka Producer: {e}")
28 + return None
29 +
30 + def send_message(producer, topic, message_data):
31 + """Sends a single message to the specified topic."""
32 + try:
33 + # Generate random integer between 1000 and 2000
34 + random_id = int(time.time() * 1000) % 1000 + 1000
35 +
36 + # The key can be None, or a specific value (e.g., to ensure messages with the same key go to the same partition)
37 + # For this basic example, we'll use None as the key.
38 + key = None
39 + key = f'client_{random_id}'
40 + #key = '21300'
41 + #key = f'client-21300'
42 + future = producer.send(topic, key=key, value=message_data)
43 + # Block for 'synchronous' sends; wait for ack from broker
44 + # record_metadata = future.get(timeout=10) # seconds
45 + # print(f"Message sent to topic '{record_metadata.topic}' partition {record_metadata.partition} offset {record_metadata.offset}")
46 + print(f"Message sent: {message_data}")
47 + except Exception as e:
48 + print(f"Error sending message: {e}")
49 +
50 + if __name__ == "__main__":
51 + producer = create_producer()
52 +
53 + if producer:
54 + try:
55 + for i in range(MESSAGES_TO_SEND):
56 + message = {
57 + 'id': i,
58 + 'text': f'Hello from Python Kafka Producer! Message #{i}',
59 + 'timestamp': time.time()
60 + }
61 + send_message(producer, TOPIC_NAME, message)
62 + #time.sleep(0.1) # Wait a bit between messages
63 +
64 + # Ensure all buffered messages are sent before exiting
65 + producer.flush()
66 + #time.sleep(1)
67 + print(f"All {MESSAGES_TO_SEND} messages sent and flushed.")
68 + #time. sleep(1)
69 +
70 + except KeyboardInterrupt:
71 + print("Producer interrupted by user.")
72 + finally:
73 + if producer:
74 + producer.close()
75 + print("Kafka Producer closed.")
Newer Older