1 Star 0 Fork 1

顚峰無琂/kafka_batchProducer

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
send_control_main.py 3.74 KB
一键复制 编辑 原始数据 按行查看 历史
colin0519 提交于 2023-03-01 15:51 . solve 5000 batch send bug
import threading
from Kafka_connector.kafka_cursor import kafkaCursorMaker
import time, os, datetime
class BatchSendMessage:
def __init__(self, config):
self.kafka_cursor_list = []
self.thread_list = []
self.config = config
print(f'【{datetime.datetime.now()}】:Kafka连接的环境信息为:{config}')
# 构造kafka连接器
def make_kafka_cursor(self):
return kafkaCursorMaker(config=self.config)
# 单个线程发送的数据总量
def send_massage(self, topic_data_list, batch_num, thread_num):
# 统计单线程的总数量
each_thread_num = self.turns * len(topic_data_list)
# 处理分区与编号
# 分区个数列表
partion_num_list = [thread_num % len(self.kcursor.producer.partitions_for(topic=topic)) for topic, _ in topic_data_list]
print(f'【{datetime.datetime.now()}】:开始(第一条)发送时间记录')
for _i in range(self.turns):
# print(f'【{datetime.datetime.now()}】:第 {_i} 轮')
for index, each_message in enumerate(topic_data_list):
topic = each_message[0]
data = each_message[1]
partition_index = partion_num_list[index]
self.send_batch_producer = self.kcursor.send_producer_data(topic=topic, produce_value_data=data, partition=partition_index)
# 按照开发的要求,batch_num一批量发送一次,默认是500个
if (_i * 10) % batch_num == 0:
self.send_batch_producer.flush()
else:
self.send_batch_producer.flush()
print(f"""【{datetime.datetime.now()}】-【{threading.current_thread().ident}】:{each_thread_num}条""")
# 循环控制
def circle_control(self, total_num,):
self.turns = int(total_num / self.thread_num / len(self.message_topic_data_list))
def hight_send_message(self, thread_num, total_num, message_topic_data_list, batch_num):
self.thread_num = thread_num
self.message_topic_data_list = message_topic_data_list
self.circle_control(total_num=total_num)
print(f'【{datetime.datetime.now()}】: 总数:{total_num}, 线程:{thread_num}, 单循环:{self.turns}')
print(f'【{datetime.datetime.now()}】:正在调用多线程')
for _th_ in range(thread_num):
# 构造Kafka连接助手
self.kcursor = self.make_kafka_cursor()
# 并做保存
self.kafka_cursor_list.append(self.kcursor)
thread = threading.Thread(target=self.send_massage, args=(self.message_topic_data_list, batch_num, _th_))
self.thread_list.append(thread)
try:
for _thread_loop_index, _th in enumerate(self.thread_list):
print(f'【{datetime.datetime.now()}】-----------------本轮为第:{_thread_loop_index}轮循环开始, 共 {thread_num} 轮-----------------------------')
_th.start()
print(f'【{datetime.datetime.now()}】:多线程任务已启动')
for _th in self.thread_list:
_th.join()
print(f'【{datetime.datetime.now()}】------------------ end ------------')
except Exception as e:
print(f'【{datetime.datetime.now()}】:未知错误,报错为 {e}')
# finally:
# self.__exit__('', '', '')
def __enter__(self):
return self
# 自动管理关闭所有的链接
def __exit__(self, exc_type, exc_val, exc_tb):
print(f'【{datetime.datetime.now()}】:正在执行关闭Kafka助手连接')
for cursor in self.kafka_cursor_list:
cursor.producer.close()
print(f'【{datetime.datetime.now()}】:已完成关闭Kafka助手连接')
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/summithwang/kafka_batch-producer.git
[email protected]:summithwang/kafka_batch-producer.git
summithwang
kafka_batch-producer
kafka_batchProducer
master

搜索帮助