producer.py
· 2.7 KiB · Python
Raw
import time
import json
from kafka import KafkaProducer
# --- Configuration ---
KAFKA_BROKER = ['localhost:29001', 'localhost:29002', 'localhost:29003']
TOPIC_NAME = 'my-test-topic'
MESSAGES_TO_SEND = 100
def create_producer():
"""Creates and returns a KafkaProducer instance."""
try:
producer = KafkaProducer(
bootstrap_servers=KAFKA_BROKER,
# Serialize messages as JSON and then encode to UTF-8 bytes
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
# Key serializer
key_serializer=str.encode,
# Optional: Add acks='all' for higher durability guarantees
acks='all',
# Optional: Add retries for transient errors
retries=5
)
print("Kafka Producer created successfully.")
return producer
except Exception as e:
print(f"Error creating Kafka Producer: {e}")
return None
def send_message(producer, topic, message_data):
"""Sends a single message to the specified topic."""
try:
# Generate random integer between 1000 and 2000
random_id = int(time.time() * 1000) % 1000 + 1000
# The key can be None, or a specific value (e.g., to ensure messages with the same key go to the same partition)
# For this basic example, we'll use None as the key.
key = None
key = f'client_{random_id}'
#key = '21300'
#key = f'client-21300'
future = producer.send(topic, key=key, value=message_data)
# Block for 'synchronous' sends; wait for ack from broker
# record_metadata = future.get(timeout=10) # seconds
# print(f"Message sent to topic '{record_metadata.topic}' partition {record_metadata.partition} offset {record_metadata.offset}")
print(f"Message sent: {message_data}")
except Exception as e:
print(f"Error sending message: {e}")
if __name__ == "__main__":
producer = create_producer()
if producer:
try:
for i in range(MESSAGES_TO_SEND):
message = {
'id': i,
'text': f'Hello from Python Kafka Producer! Message #{i}',
'timestamp': time.time()
}
send_message(producer, TOPIC_NAME, message)
#time.sleep(0.1) # Wait a bit between messages
# Ensure all buffered messages are sent before exiting
producer.flush()
#time.sleep(1)
print(f"All {MESSAGES_TO_SEND} messages sent and flushed.")
#time. sleep(1)
except KeyboardInterrupt:
print("Producer interrupted by user.")
finally:
if producer:
producer.close()
print("Kafka Producer closed.")
| 1 | import time |
| 2 | import json |
| 3 | from kafka import KafkaProducer |
| 4 | |
| 5 | # --- Configuration --- |
| 6 | KAFKA_BROKER = ['localhost:29001', 'localhost:29002', 'localhost:29003'] |
| 7 | TOPIC_NAME = 'my-test-topic' |
| 8 | MESSAGES_TO_SEND = 100 |
| 9 | |
| 10 | def create_producer(): |
| 11 | """Creates and returns a KafkaProducer instance.""" |
| 12 | try: |
| 13 | producer = KafkaProducer( |
| 14 | bootstrap_servers=KAFKA_BROKER, |
| 15 | # Serialize messages as JSON and then encode to UTF-8 bytes |
| 16 | value_serializer=lambda v: json.dumps(v).encode('utf-8'), |
| 17 | # Key serializer |
| 18 | key_serializer=str.encode, |
| 19 | # Optional: Add acks='all' for higher durability guarantees |
| 20 | acks='all', |
| 21 | # Optional: Add retries for transient errors |
| 22 | retries=5 |
| 23 | ) |
| 24 | print("Kafka Producer created successfully.") |
| 25 | return producer |
| 26 | except Exception as e: |
| 27 | print(f"Error creating Kafka Producer: {e}") |
| 28 | return None |
| 29 | |
| 30 | def send_message(producer, topic, message_data): |
| 31 | """Sends a single message to the specified topic.""" |
| 32 | try: |
| 33 | # Generate random integer between 1000 and 2000 |
| 34 | random_id = int(time.time() * 1000) % 1000 + 1000 |
| 35 | |
| 36 | # The key can be None, or a specific value (e.g., to ensure messages with the same key go to the same partition) |
| 37 | # For this basic example, we'll use None as the key. |
| 38 | key = None |
| 39 | key = f'client_{random_id}' |
| 40 | #key = '21300' |
| 41 | #key = f'client-21300' |
| 42 | future = producer.send(topic, key=key, value=message_data) |
| 43 | # Block for 'synchronous' sends; wait for ack from broker |
| 44 | # record_metadata = future.get(timeout=10) # seconds |
| 45 | # print(f"Message sent to topic '{record_metadata.topic}' partition {record_metadata.partition} offset {record_metadata.offset}") |
| 46 | print(f"Message sent: {message_data}") |
| 47 | except Exception as e: |
| 48 | print(f"Error sending message: {e}") |
| 49 | |
| 50 | if __name__ == "__main__": |
| 51 | producer = create_producer() |
| 52 | |
| 53 | if producer: |
| 54 | try: |
| 55 | for i in range(MESSAGES_TO_SEND): |
| 56 | message = { |
| 57 | 'id': i, |
| 58 | 'text': f'Hello from Python Kafka Producer! Message #{i}', |
| 59 | 'timestamp': time.time() |
| 60 | } |
| 61 | send_message(producer, TOPIC_NAME, message) |
| 62 | #time.sleep(0.1) # Wait a bit between messages |
| 63 | |
| 64 | # Ensure all buffered messages are sent before exiting |
| 65 | producer.flush() |
| 66 | #time.sleep(1) |
| 67 | print(f"All {MESSAGES_TO_SEND} messages sent and flushed.") |
| 68 | #time. sleep(1) |
| 69 | |
| 70 | except KeyboardInterrupt: |
| 71 | print("Producer interrupted by user.") |
| 72 | finally: |
| 73 | if producer: |
| 74 | producer.close() |
| 75 | print("Kafka Producer closed.") |