python实现websocket的客户端压力测试

python实现websocket的客户端压力测试

2023年7月11日发(作者:)

python实现websocket的客户端压⼒测试使⽤python进⾏websocket的客户端压⼒测试,这个代码是从github上 找到。然后简单修改了下。⼤神运⽤了进程池,以及线程池的内容。所以保存下来,学习学习然后需要说明的是:本次⽤的python2.7,也尝试⽤python3.6,但是⽼实出现websocket-client包和python3不能兼容的情况,提⽰没有相关的⽅法。所以不得已最后⼜采⽤了python2# -*- coding:utf-8 -*-# __author__ == 'chenmingle'

import websocketimport timeimport threadingimport jsonimport multiprocessingimport uuidfrom threadpool import ThreadPool, makeRequests

# 修改成⾃⼰的websocket地址WS_URL = "xxxx"# 定义进程数processes = 4# 定义线程数(每个⽂件可能限制1024个,可以修改等参数)thread_num = 700index = 1

def on_message(ws, message): # print(message) pass

def on_error(ws, error): print(error) pass

def on_close(ws): # print("### closed ###") pass

def on_open(ws): global index index = index + 1

def send_thread(): # 设置你websocket的内容 # 每隔10秒发送⼀下数据使链接不中断 while True: (u'hello服务器') (10)

t = (target=send_thread) ()

def on_start(num): (5) # Trace(True) ws = ketApp(WS_URL + str(num), on_message=on_message, on_error=on_error, on_close=on_close) _open = on_open _forever()

def thread_web_socket(): # 线程池 pool_list = ThreadPool(thread_num) num = list() # 设置开启线程的数量 for ir in range(thread_num): (ir) requests = makeRequests(on_start, num) [pool_uest(req) for req in requests] pool_()

if __name__ == "__main__": # 进程池 pool = (processes=processes) # 设置开启进程的数量 for i in xrange(processes): _async(thread_web_socket) () ()以上就是本⽂的全部内容,希望对⼤家的学习有所帮助,也希望⼤家多多⽀持。

发布者:admin,转转请注明出处:http://www.yc00.com/web/1689028117a197282.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信