consumer.py
· 2.8 KiB · Python
Raw
from time import sleep
import json
from kafka import KafkaConsumer
# --- Configuration ---
KAFKA_BROKER = ['localhost:29001', 'localhost:29002', 'localhost:29003']
TOPIC_NAME = 'my-test-topic'
CONSUMER_GROUP_ID = 'my-python-consumer-group2' # Important for parallelism and offset tracking
def create_consumer():
"""Creates and returns a KafkaConsumer instance."""
try:
consumer = KafkaConsumer(
TOPIC_NAME, # Topic(s) to subscribe to
bootstrap_servers=KAFKA_BROKER,
auto_offset_reset='earliest', # 'earliest' to read from the beginning, 'latest' for new messages only
group_id=CONSUMER_GROUP_ID, # Consumer group ID
# Deserialize messages from UTF-8 bytes and then parse JSON
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
# Optional: consumer_timeout_ms=1000 # Stop iterating if no message after 1s
# Optional: enable_auto_commit=True # Default is True
# Optional: auto_commit_interval_ms=5000 # Default is 5s
)
print("Kafka Consumer created successfully. Subscribed to topic:", TOPIC_NAME)
return consumer
except Exception as e:
print(f"Error creating Kafka Consumer: {e}")
return None
if __name__ == "__main__":
consumer = create_consumer()
if consumer:
print("Listening for messages... (Press Ctrl+C to stop)")
try:
for message in consumer:
# message object contains details like:
# message.topic, message.partition, message.offset,
# message.key, message.value
#print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}")
# print(f"\nReceived message:")
# print(f" Topic: {message.topic}")
# print(f" Partition: {message.partition}")
# print(f" Offset: {message.offset}")
# # print(f" Key: {message.key}") # We didn't send a key in this example
# print(f" Value: {message.value}")
# print("Processing now....")
# #sleep(0.1)
# print("Processing complete! Move to next msg!")
# Optionally, you might want to commit offsets manually if auto_commit is off
# consumer.commit()
except KeyboardInterrupt:
print("\nConsumer interrupted by user.")
except Exception as e:
print(f"An error occurred during consumption: {e}")
finally:
if consumer:
consumer.close()
print("Kafka Consumer closed.")
| 1 | from time import sleep |
| 2 | import json |
| 3 | from kafka import KafkaConsumer |
| 4 | |
| 5 | # --- Configuration --- |
| 6 | KAFKA_BROKER = ['localhost:29001', 'localhost:29002', 'localhost:29003'] |
| 7 | TOPIC_NAME = 'my-test-topic' |
| 8 | CONSUMER_GROUP_ID = 'my-python-consumer-group2' # Important for parallelism and offset tracking |
| 9 | |
| 10 | def create_consumer(): |
| 11 | """Creates and returns a KafkaConsumer instance.""" |
| 12 | try: |
| 13 | consumer = KafkaConsumer( |
| 14 | TOPIC_NAME, # Topic(s) to subscribe to |
| 15 | bootstrap_servers=KAFKA_BROKER, |
| 16 | auto_offset_reset='earliest', # 'earliest' to read from the beginning, 'latest' for new messages only |
| 17 | group_id=CONSUMER_GROUP_ID, # Consumer group ID |
| 18 | # Deserialize messages from UTF-8 bytes and then parse JSON |
| 19 | value_deserializer=lambda v: json.loads(v.decode('utf-8')), |
| 20 | # Optional: consumer_timeout_ms=1000 # Stop iterating if no message after 1s |
| 21 | # Optional: enable_auto_commit=True # Default is True |
| 22 | # Optional: auto_commit_interval_ms=5000 # Default is 5s |
| 23 | ) |
| 24 | print("Kafka Consumer created successfully. Subscribed to topic:", TOPIC_NAME) |
| 25 | return consumer |
| 26 | except Exception as e: |
| 27 | print(f"Error creating Kafka Consumer: {e}") |
| 28 | return None |
| 29 | |
| 30 | if __name__ == "__main__": |
| 31 | consumer = create_consumer() |
| 32 | |
| 33 | if consumer: |
| 34 | print("Listening for messages... (Press Ctrl+C to stop)") |
| 35 | try: |
| 36 | for message in consumer: |
| 37 | # message object contains details like: |
| 38 | # message.topic, message.partition, message.offset, |
| 39 | # message.key, message.value |
| 40 | |
| 41 | #print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}") |
| 42 | print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}") |
| 43 | |
| 44 | # print(f"\nReceived message:") |
| 45 | # print(f" Topic: {message.topic}") |
| 46 | # print(f" Partition: {message.partition}") |
| 47 | # print(f" Offset: {message.offset}") |
| 48 | # # print(f" Key: {message.key}") # We didn't send a key in this example |
| 49 | # print(f" Value: {message.value}") |
| 50 | # print("Processing now....") |
| 51 | # #sleep(0.1) |
| 52 | # print("Processing complete! Move to next msg!") |
| 53 | |
| 54 | |
| 55 | # Optionally, you might want to commit offsets manually if auto_commit is off |
| 56 | # consumer.commit() |
| 57 | |
| 58 | except KeyboardInterrupt: |
| 59 | print("\nConsumer interrupted by user.") |
| 60 | except Exception as e: |
| 61 | print(f"An error occurred during consumption: {e}") |
| 62 | finally: |
| 63 | if consumer: |
| 64 | consumer.close() |
| 65 | print("Kafka Consumer closed.") |
producer.py
· 2.7 KiB · Python
Raw
import time
import json
from kafka import KafkaProducer
# --- Configuration ---
KAFKA_BROKER = ['localhost:29001', 'localhost:29002', 'localhost:29003']
TOPIC_NAME = 'my-test-topic'
MESSAGES_TO_SEND = 100
def create_producer():
"""Creates and returns a KafkaProducer instance."""
try:
producer = KafkaProducer(
bootstrap_servers=KAFKA_BROKER,
# Serialize messages as JSON and then encode to UTF-8 bytes
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
# Key serializer
key_serializer=str.encode,
# Optional: Add acks='all' for higher durability guarantees
acks='all',
# Optional: Add retries for transient errors
retries=5
)
print("Kafka Producer created successfully.")
return producer
except Exception as e:
print(f"Error creating Kafka Producer: {e}")
return None
def send_message(producer, topic, message_data):
"""Sends a single message to the specified topic."""
try:
# Generate random integer between 1000 and 2000
random_id = int(time.time() * 1000) % 1000 + 1000
# The key can be None, or a specific value (e.g., to ensure messages with the same key go to the same partition)
# For this basic example, we'll use None as the key.
key = None
key = f'client_{random_id}'
#key = '21300'
#key = f'client-21300'
future = producer.send(topic, key=key, value=message_data)
# Block for 'synchronous' sends; wait for ack from broker
# record_metadata = future.get(timeout=10) # seconds
# print(f"Message sent to topic '{record_metadata.topic}' partition {record_metadata.partition} offset {record_metadata.offset}")
print(f"Message sent: {message_data}")
except Exception as e:
print(f"Error sending message: {e}")
if __name__ == "__main__":
producer = create_producer()
if producer:
try:
for i in range(MESSAGES_TO_SEND):
message = {
'id': i,
'text': f'Hello from Python Kafka Producer! Message #{i}',
'timestamp': time.time()
}
send_message(producer, TOPIC_NAME, message)
#time.sleep(0.1) # Wait a bit between messages
# Ensure all buffered messages are sent before exiting
producer.flush()
#time.sleep(1)
print(f"All {MESSAGES_TO_SEND} messages sent and flushed.")
#time. sleep(1)
except KeyboardInterrupt:
print("Producer interrupted by user.")
finally:
if producer:
producer.close()
print("Kafka Producer closed.")
| 1 | import time |
| 2 | import json |
| 3 | from kafka import KafkaProducer |
| 4 | |
| 5 | # --- Configuration --- |
| 6 | KAFKA_BROKER = ['localhost:29001', 'localhost:29002', 'localhost:29003'] |
| 7 | TOPIC_NAME = 'my-test-topic' |
| 8 | MESSAGES_TO_SEND = 100 |
| 9 | |
| 10 | def create_producer(): |
| 11 | """Creates and returns a KafkaProducer instance.""" |
| 12 | try: |
| 13 | producer = KafkaProducer( |
| 14 | bootstrap_servers=KAFKA_BROKER, |
| 15 | # Serialize messages as JSON and then encode to UTF-8 bytes |
| 16 | value_serializer=lambda v: json.dumps(v).encode('utf-8'), |
| 17 | # Key serializer |
| 18 | key_serializer=str.encode, |
| 19 | # Optional: Add acks='all' for higher durability guarantees |
| 20 | acks='all', |
| 21 | # Optional: Add retries for transient errors |
| 22 | retries=5 |
| 23 | ) |
| 24 | print("Kafka Producer created successfully.") |
| 25 | return producer |
| 26 | except Exception as e: |
| 27 | print(f"Error creating Kafka Producer: {e}") |
| 28 | return None |
| 29 | |
| 30 | def send_message(producer, topic, message_data): |
| 31 | """Sends a single message to the specified topic.""" |
| 32 | try: |
| 33 | # Generate random integer between 1000 and 2000 |
| 34 | random_id = int(time.time() * 1000) % 1000 + 1000 |
| 35 | |
| 36 | # The key can be None, or a specific value (e.g., to ensure messages with the same key go to the same partition) |
| 37 | # For this basic example, we'll use None as the key. |
| 38 | key = None |
| 39 | key = f'client_{random_id}' |
| 40 | #key = '21300' |
| 41 | #key = f'client-21300' |
| 42 | future = producer.send(topic, key=key, value=message_data) |
| 43 | # Block for 'synchronous' sends; wait for ack from broker |
| 44 | # record_metadata = future.get(timeout=10) # seconds |
| 45 | # print(f"Message sent to topic '{record_metadata.topic}' partition {record_metadata.partition} offset {record_metadata.offset}") |
| 46 | print(f"Message sent: {message_data}") |
| 47 | except Exception as e: |
| 48 | print(f"Error sending message: {e}") |
| 49 | |
| 50 | if __name__ == "__main__": |
| 51 | producer = create_producer() |
| 52 | |
| 53 | if producer: |
| 54 | try: |
| 55 | for i in range(MESSAGES_TO_SEND): |
| 56 | message = { |
| 57 | 'id': i, |
| 58 | 'text': f'Hello from Python Kafka Producer! Message #{i}', |
| 59 | 'timestamp': time.time() |
| 60 | } |
| 61 | send_message(producer, TOPIC_NAME, message) |
| 62 | #time.sleep(0.1) # Wait a bit between messages |
| 63 | |
| 64 | # Ensure all buffered messages are sent before exiting |
| 65 | producer.flush() |
| 66 | #time.sleep(1) |
| 67 | print(f"All {MESSAGES_TO_SEND} messages sent and flushed.") |
| 68 | #time. sleep(1) |
| 69 | |
| 70 | except KeyboardInterrupt: |
| 71 | print("Producer interrupted by user.") |
| 72 | finally: |
| 73 | if producer: |
| 74 | producer.close() |
| 75 | print("Kafka Producer closed.") |
| 1 | kafka-python==2.2.10 |
| 2 | murmurhash2==0.2.10 |