使用locust进行Websocket压力测试和接口压力测试

安装

安装 Locust:

1
pip3 install locust

查看版本

1
2
locust -V
# locust 2.5.1

基本用法

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from locust import User, events, task, between
import random


def success_call(name, recvText, total_time):
events.request_success.fire(
request_type="[Success]",
name=name,
response_time=total_time,
response_length=len(recvText)
)


def fail_call(name, total_time):
events.request_failure.fire(
request_type="[Fail]",
name=name,
response_time=total_time,
response_length=0,
exception=Exception(),
)


class MyUser(User):
wait_time = between(1, 5)

@task(1)
def job1(self):
print('This is job 1')
self.sendMsg("job1", 'This is job 2')

@task(2)
def job2(self):
print('This is job 2')
self.sendMsg("job2", 'This is job 2')

def sendMsg(self, methodname, msg):
num = random.randint(0, 2)
time = random.randint(10, 50)
if (num == 0):
fail_call(methodname, time)
else:
success_call(methodname, msg, time);

执行

1
locust -f test4.py --autostart --autoquit 0 -u 1 -r 3 --run-time 10s

其中

  • request_type应该是请求的方式,这里为了方便理解,直接使用的[Success][Fail],来区分成功的请求和失败的请求,只有成功或失败的回调被调用后,locust才会对其统计。

  • @task(2)里面的数字是权重。

  • wait_time 是两个任务之间的间隔时间,如果不设置则会尽可能快的执行。

    • wait_time = between(1, 5) 间隔是1-5秒随机值
    • wait_time = constant(1) 间隔固定1秒

注意:

所有的task不能是异步方法。

结果

image-20220116154519702

TaskSet

如果你正在测试一个网站的性能,这个网站是以分层的方式构建的,包括部分和子部分,那么以同样的方式构建负载测试可能会很有用。为此,locust提供了任务集类。它是将执行的任务的集合,与直接在用户类上声明的任务非常相似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from locust import User, TaskSet, constant,task


class ForumSection(TaskSet):
wait_time = constant(1)

@task(10)
def job1(self):
print("job1----")

@task
def job2(self):
print("job2+++++")

@task
def stop(self):
self.interrupt()


class LoggedInUser(User):
wait_time = constant(5)
tasks = {ForumSection: 2}

@task
def my_task(self):
print("my_task=======")

执行

1
locust -f test5.py --autostart --autoquit 0 -u 1 -r 3 --run-time 10s

也可以这样写

1
2
3
4
class MyUser(User):
@task
class MyTaskSet(TaskSet):
...

SequentialTaskSet

SequentialTaskSet是一个任务集,其任务将按照声明的顺序执行。可以在任务集中嵌套顺序任务集,反之亦然。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from locust import User, SequentialTaskSet, constant,task


class MySet(SequentialTaskSet):
wait_time = constant(1)

@task(10)
def job1(self):
print("job1----")

@task
def job2(self):
print("job2+++++")

@task
def stop(self):
print("stop")
self.interrupt()


class LoggedInUser(User):
wait_time = constant(5)
tasks = {MySet: 2}

@task
def my_task(self):
print("my_task=======")

执行

1
locust -f test6.py --autostart --autoquit 0 -u 1 -r 3 --run-time 60s

先后顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from locust import User, SequentialTaskSet, constant, task


class ASet(SequentialTaskSet):
wait_time = constant(1)

@task(2)
def job1(self):
print("a-1")

@task
def job2(self):
print("a-2")

@task
def stop(self):
print("a-stop")
self.interrupt()


class BSet(SequentialTaskSet):
wait_time = constant(1)

@task(2)
def job1(self):
print("b-1")

@task
def job2(self):
print("b-2")

@task
def stop(self):
print("b-stop")
self.interrupt()


class LoggedInUser(User):
wait_time = constant(1)

tasks = {ASet: 2, BSet: 1}

@task
def my_task(self):
print("c-1")

从结果看

a,b,c三者并没有先后的执行循序

ASet和BSet内是按照循序执行的

ASet和BSet执行的概率为2:1

接口压测

在当前目录下创建locustfile.py文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import random
from locust import HttpUser, task, between

class QuickstartUser(HttpUser):
wait_time = between(1, 3)
host = "http://www.psvmc.cn"

@task
def index_page(self):
self.client.get("/index.html") # 这里的地址需要排除 host 部分
self.client.get("/userlist.json")

@task(3)
def view_item(self):
item_id = random.randint(1, 10000)
self.client.get(f"/search.json?id={item_id}", name="/search.json")

def on_start(self):
self.client.post("/login.json", {"username":"foo", "password":"bar"})

运行

1
locust

注意

Locust 会默认查找当前目录下名为locustfile.py,如果该文件不在当前文件夹或者你取了其他的名字,就需要加上下面的 -f 参数了。

比如

1
locust -f locust_files/my_locust_file.py

网页上访问

http://localhost:8089/

连接WS

压测WS前我们要先看看怎么连接WS

连接WS的库有的是支持异步IO的,项目中我们推荐这样的库,但是压测时还是要选择同步的库

异步

安装依赖

1
pip install websockets

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
import websockets
import json
import random


async def mytest():
async with websockets.connect('wss://sockettest.xhkjedu.com/ws') as websocket:
num = random.randint(0, 10000000)
msg = {
"b": {"num": num},
"c": 123456,
}

msgstr = json.dumps(msg)

await websocket.send(msgstr)
print(f"↑: {msgstr}")

greeting = await websocket.recv()
print(f"↓: {greeting}")

asyncio.get_event_loop().run_until_complete(mytest())

同步

官网地址

https://pypi.org/project/websocket-client/

安装

1
pip install websocket-client

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from websocket import create_connection
import json
import random

ws = create_connection("wss://sockettest.xhkjedu.com/ws")

num = random.randint(0, 10000000)
msg = {
"b": {"num": num},
"c": 123456,
}

msgstr = json.dumps(msg)
print("Sending " + msgstr)
ws.send(msgstr)

result = ws.recv()
print("Received '%s'" % result)
ws.close()

或者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import websocket


def on_message(ws, message):
print(ws)
print(message)


def on_error(ws, error):
print(ws)
print(error)


def on_close(ws):
print(ws)
print("### closed ###")


websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://127.0.0.1:8888/track",
on_message=on_message,
on_error=on_error,
on_close=on_close)

ws.run_forever()

Websocket压测

Jmeter要测试websocket接口,需要先下载安装一个websocket samplers by peter doornbosch的插件

而locust因为是代码实现,所以可以进行任何的测试,引用相应的库即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from locust import User, task, events
import time
from websocket import create_connection
import json
import random


def success_call(name, recvText, total_time):
events.request_success.fire(
request_type="[Success]",
name=name,
response_time=total_time,
response_length=len(recvText)
)


def fail_call(name, total_time, e):
events.request_failure.fire(
request_type="[Fail]",
name=name,
response_time=total_time,
response_length=0,
exception=e,
)


class WebSocketClient(object):
def __init__(self, host):
self.host = host
self.ws = None

def connect(self, burl):
self.ws = create_connection(burl)

def recv(self):
return self.ws.recv()

def send(self, msg):
self.ws.send(msg)


class WebsocketUser(User):
abstract = True

def __init__(self, *args, **kwargs):
super(WebsocketUser, self).__init__(*args, **kwargs)
self.client = WebSocketClient(self.host)
self.client._locust_environment = self.environment


class ApiUser(WebsocketUser):
host = "wss://sockettest.xhkjedu.com/"

@task(1)
def pft(self):
# wss 地址
self.url = 'wss://sockettest.xhkjedu.com/ws'
print("连接前")
start_time = time.time()
try:
self.client.connect(self.url)
print("连接后")
# 发送的订阅请求
num = random.randint(0, 10000000)
msg = {
"b": {"num": num},
"c": 123456,
}

msgstr = json.dumps(msg)

self.client.send(msgstr)
print(f"↑: {msgstr}")

greeting = self.client.recv()
print(f"↓: {greeting}")

except Exception as e:
total_time = int((time.time() - start_time) * 1000)
fail_call("Send", total_time, e)
else:
total_time = int((time.time() - start_time) * 1000)
success_call("Send", "success", total_time)

测试

1
locust -f main.py -u 5000 -r 300

网页上访问

http://localhost:8089/

或者不用图形化界面

1
locust -f main.py --autostart --autoquit 0 -u 1 -r 3 --run-time 10s

其中

  • --autostart 自动开始 不使用WebUI
  • --autoquit 0autostart搭配使用,测试完成后多长时间退出,后面的数字单位是秒,如果不设置只能CTRL+C才能退出
  • -u 1 最大用户数
  • -r 3 每秒创建的用户数,创建用户数和最大用户数一样后就不再创建
  • --run-time 10s 压测的执行时间