代码拉取完成,页面将自动刷新
同步操作将从 colin0519/kafka_batchProducer 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
import threading
from Kafka_connector.kafka_cursor import kafkaCursorMaker
import time, os, datetime
class BatchSendMessage:
def __init__(self, config):
self.kafka_cursor_list = []
self.thread_list = []
self.config = config
print(f'【{datetime.datetime.now()}】:Kafka连接的环境信息为:{config}')
# 构造kafka连接器
def make_kafka_cursor(self):
return kafkaCursorMaker(config=self.config)
# 单个线程发送的数据总量
def send_massage(self, topic_data_list, batch_num, thread_num):
# 统计单线程的总数量
each_thread_num = self.turns * len(topic_data_list)
# 处理分区与编号
# 分区个数列表
partion_num_list = [thread_num % len(self.kcursor.producer.partitions_for(topic=topic)) for topic, _ in topic_data_list]
print(f'【{datetime.datetime.now()}】:开始(第一条)发送时间记录')
for _i in range(self.turns):
# print(f'【{datetime.datetime.now()}】:第 {_i} 轮')
for index, each_message in enumerate(topic_data_list):
topic = each_message[0]
data = each_message[1]
partition_index = partion_num_list[index]
self.send_batch_producer = self.kcursor.send_producer_data(topic=topic, produce_value_data=data, partition=partition_index)
# 按照开发的要求,batch_num一批量发送一次,默认是500个
if (_i * 10) % batch_num == 0:
self.send_batch_producer.flush()
else:
self.send_batch_producer.flush()
print(f"""【{datetime.datetime.now()}】-【{threading.current_thread().ident}】:{each_thread_num}条""")
# 循环控制
def circle_control(self, total_num,):
self.turns = int(total_num / self.thread_num / len(self.message_topic_data_list))
def hight_send_message(self, thread_num, total_num, message_topic_data_list, batch_num):
self.thread_num = thread_num
self.message_topic_data_list = message_topic_data_list
self.circle_control(total_num=total_num)
print(f'【{datetime.datetime.now()}】: 总数:{total_num}, 线程:{thread_num}, 单循环:{self.turns}')
print(f'【{datetime.datetime.now()}】:正在调用多线程')
for _th_ in range(thread_num):
# 构造Kafka连接助手
self.kcursor = self.make_kafka_cursor()
# 并做保存
self.kafka_cursor_list.append(self.kcursor)
thread = threading.Thread(target=self.send_massage, args=(self.message_topic_data_list, batch_num, _th_))
self.thread_list.append(thread)
try:
for _thread_loop_index, _th in enumerate(self.thread_list):
print(f'【{datetime.datetime.now()}】-----------------本轮为第:{_thread_loop_index}轮循环开始, 共 {thread_num} 轮-----------------------------')
_th.start()
print(f'【{datetime.datetime.now()}】:多线程任务已启动')
for _th in self.thread_list:
_th.join()
print(f'【{datetime.datetime.now()}】------------------ end ------------')
except Exception as e:
print(f'【{datetime.datetime.now()}】:未知错误,报错为 {e}')
# finally:
# self.__exit__('', '', '')
def __enter__(self):
return self
# 自动管理关闭所有的链接
def __exit__(self, exc_type, exc_val, exc_tb):
print(f'【{datetime.datetime.now()}】:正在执行关闭Kafka助手连接')
for cursor in self.kafka_cursor_list:
cursor.producer.close()
print(f'【{datetime.datetime.now()}】:已完成关闭Kafka助手连接')
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。