Python知識分享網(wǎng) - 專業(yè)的Python學(xué)習(xí)網(wǎng)站 學(xué)Python,上Python222
【Python】Locust持續(xù)優(yōu)化:InfluxDB與Grafana實現(xiàn)數(shù)據(jù)持久化與可視化分析
發(fā)布于:2023-07-12 12:58:46

前言

在進行性能測試時,我們需要對測試結(jié)果進行監(jiān)控和分析,以便于及時發(fā)現(xiàn)問題并進行優(yōu)化。

Locust在內(nèi)存中維護了一個時間序列數(shù)據(jù)結(jié)構(gòu),用于存儲每個事件的統(tǒng)計信息。 這個數(shù)據(jù)結(jié)構(gòu)允許我們在Charts標簽頁中查看不同時間點的性能指標,但是正因為Locust WebUI上展示的數(shù)據(jù)實際上是存儲在內(nèi)存中的。所以在Locust測試結(jié)束后,這些數(shù)據(jù)將不再可用。 如果我們需要長期保存以便后續(xù)分析測試數(shù)據(jù),可以考慮將Locust的測試數(shù)據(jù)上報到外部的數(shù)據(jù)存儲系統(tǒng),如InfluxDB,并使用Grafana等可視化工具進行展示和分析。

本文將介紹如何使用Locust進行負載測試,并將測試數(shù)據(jù)上報到InfluxDB。同時,我們將使用Grafana對測試數(shù)據(jù)進行展示和分析。

最終效果:

【Python】Locust持續(xù)優(yōu)化:InfluxDB與Grafana實現(xiàn)數(shù)據(jù)持久化與可視化分析  圖1

 

influxDB

InfluxDB是一款開源的時間序列數(shù)據(jù)庫,專為處理大量的時間序列數(shù)據(jù)而設(shè)計。時間序列數(shù)據(jù)通常是按照時間順序存儲的數(shù)據(jù)點,每個數(shù)據(jù)點都包含一個時間戳和一個或多個與之相關(guān)的值。這種數(shù)據(jù)類型在許多場景下都非常常見,如監(jiān)控系統(tǒng)、物聯(lián)網(wǎng)設(shè)備、金融市場數(shù)據(jù)等。在這些場景下,數(shù)據(jù)上報是一種關(guān)鍵的需求,因為它可以幫助我們實時了解系統(tǒng)的狀態(tài)和性能。

注: InfluxDB 開源的時單機版本,集群版本并未開元,但是對于普通用戶的日常場景已經(jīng)完全夠用。

以下是關(guān)于InfluxDB的關(guān)鍵特性和優(yōu)勢的表格:

【Python】Locust持續(xù)優(yōu)化:InfluxDB與Grafana實現(xiàn)數(shù)據(jù)持久化與可視化分析 圖2

 

安裝運行InfluxDB

如果你已經(jīng)安裝了Docker,可以直接使用官方的InfluxDB鏡像來運行InfluxDB:

 

docker run -p 8086:8086 -v $PWD:/var/lib/influxdb influxdb:1.8

 

使用Python 上報數(shù)據(jù)到influxdb

首先,確保已經(jīng)安裝了influxdb庫:

 

pip install influxdb

 

然后,使用以下代碼上報數(shù)據(jù)到InfluxDB:
以下是一個使用Python操作InfluxDB上報數(shù)據(jù)的示例,對照MySQL進行注釋:

 

import time
from influxdb import InfluxDBClient

# 連接到InfluxDB(類似于連接到MySQL數(shù)據(jù)庫)
client = InfluxDBClient(host='localhost', port=8086)

# 創(chuàng)建數(shù)據(jù)庫(類似于在MySQL中創(chuàng)建一個新的數(shù)據(jù)庫)
client.create_database('mydb')

# 切換到創(chuàng)建的數(shù)據(jù)庫(類似于在MySQL中選擇一個數(shù)據(jù)庫)
client.switch_database('mydb')

# 上報數(shù)據(jù)(類似于在MySQL中插入一條記錄)
data = [
    {
        # 在InfluxDB中,measurement相當(dāng)于MySQL中的表名
        "measurement": "cpu_load",
        # tags相當(dāng)于MySQL中的索引列,用于快速查詢
        "tags": {
            "host": "server01",
            "region": "us-west"
        },
        # time為時間戳,是InfluxDB中的關(guān)鍵字段
        "time": int(time.time_ns()),
        # fields相當(dāng)于MySQL中的數(shù)據(jù)列,用于存儲實際的數(shù)據(jù)值
        "fields": {
            "value": 0.64
        }
    }
]

# 寫入數(shù)據(jù)(類似于在MySQL中執(zhí)行INSERT語句)
client.write_points(data)

 

在這個示例中,我們首先連接到InfluxDB(類似于連接到MySQL數(shù)據(jù)庫),然后創(chuàng)建一個名為mydb的數(shù)據(jù)庫(類似于在MySQL中創(chuàng)建一個新的數(shù)據(jù)庫),并切換到創(chuàng)建的數(shù)據(jù)庫(類似于在MySQL中選擇一個數(shù)據(jù)庫)。接著,我們準備了一條名為cpu_load的數(shù)據(jù)(在InfluxDB中,measurement相當(dāng)于MySQL中的表名),并為數(shù)據(jù)添加了hostregion標簽(類似于MySQL中的索引列)。最后,我們將數(shù)據(jù)寫入到InfluxDB中(類似于在MySQL中執(zhí)行INSERT語句)。

執(zhí)行上面的代碼后我們可以看到我們的操作成功了:

在這個示例中,我們首先連接到InfluxDB(類似于連接到MySQL數(shù)據(jù)庫),然后創(chuàng)建一個名為mydb的數(shù)據(jù)庫(類似于在MySQL中創(chuàng)建一個新的數(shù)據(jù)庫),并切換到創(chuàng)建的數(shù)據(jù)庫(類似于在MySQL中選擇一個數(shù)據(jù)庫)。接著,我們準備了一條名為cpu_load的數(shù)據(jù)(在InfluxDB中,measurement相當(dāng)于MySQL中的表名),并為數(shù)據(jù)添加了host和region標簽(類似于MySQL中的索引列)。最后,我們將數(shù)據(jù)寫入到InfluxDB中(類似于在MySQL中執(zhí)行INSERT語句)。  執(zhí)行上面的代碼后我們可以看到我們的操作成功了: 圖3

如果我們安裝了influx-cli就可以在命令行中直接查詢剛才寫入的數(shù)據(jù):

 

bingohe@MacBook-Pro ~ $ /usr/local/Cellar/influxdb@1/1.11.1/bin/influx 
Connected to http://localhost:8086 version 1.8.10
InfluxDB shell version: 1.11.1
> show databases;
name: databases
name
----
_internal
mydb
> use mydb
Using database mydb
> show measurements;
name: measurements
name
----
cpu_load
> select * from cpu_load;
name: cpu_load
time                host     region  value
----                ----     ------  -----
1688874870046897000 server01 us-west 0.64

 

【Python】Locust持續(xù)優(yōu)化:InfluxDB與Grafana實現(xiàn)數(shù)據(jù)持久化與可視化分析 圖4

 

Locust 數(shù)據(jù)寫入到 influx

在 【Python】萬字長文,Locust 性能測試指北(上) 中我們提到過Locust的生命周期,我們也通過Locust生命周期實現(xiàn)了集合點的功能。現(xiàn)在我們一起來通過它實現(xiàn)測試數(shù)據(jù)的實時展示。

Locust的生命周期

  1. test_start:測試開始時觸發(fā)。
  2. spawning_start:生成用戶時觸發(fā)。
  3. user_add:每個用戶被添加時觸發(fā)。
  4. spawning_complete:所有用戶生成完成時觸發(fā)。
  5. request:每個請求發(fā)生時觸發(fā)。
  6. test_stop:測試停止時觸發(fā)。

上報數(shù)據(jù)

我們先來看看常用的事件里面可以獲取到的數(shù)據(jù):

 

import time
from locust import HttpUser, task, between, events


@events.request.add_listener
def request_handler(*args, **kwargs):
    print(f"request args: {args}")
    print(f"request kwargs: {kwargs}")


@events.worker_report.add_listener
def worker_report_handlers(*args, **kwargs):
    print(f"worker_report args: {args}")
    print(f"worker_report kwargs: {kwargs}")


@events.test_start.add_listener
def test_start_handlers(*args, **kwargs):
    print(f"test_start args: {args}")
    print(f"test_start kwargs: {kwargs}")


@events.test_stop.add_listener
def test_stop_handlers(*args, **kwargs):
    print(f"test_stop args: {args}")
    print(f"test_stop kwargs: {kwargs}")


class QuickstartUser(HttpUser):
    wait_time = between(1, 2)

    @task
    def root(self):
        with self.client.get("/", json={"time": time.time()}, catch_response=True) as rsp:
            rsp_json = rsp.json()
            if rsp_json["id"] != 5:
                # 失敗時上報返回的數(shù)據(jù)
                rsp.failure(f"{rsp_json}")

 

運行一次測試時能看到這些生命周期內(nèi)的Locust 對外暴露的數(shù)據(jù):

 

test_start args: ()
test_start kwargs: {'environment': <locust.env.Environment object at 0x10c426c70>}
request args: ()
request kwargs: {'request_type': 'GET', 'response_time': 2.6886250000011103, 'name': '/', 'context': {}, 'response': <Response [200]>, 'exception': None, 'start_time': 1688888321.896039, 'url': 'http://0.0.0.0:10000/', 'response_length': 8}
request args: ()
request kwargs: {'request_type': 'GET', 'response_time': 2.735957999998817, 'name': '/', 'context': {}, 'response': <Response [200]>, 'exception': CatchResponseError("{'id': 6}"), 'start_time': 1688888323.421389, 'url': 'http://0.0.0.0:10000/', 'response_length': 8}
test_stopping args: ()
test_stopping kwargs: {'environment': <locust.env.Environment object at 0x10c426c70>}
test_stop args: ()
test_stop kwargs: {'environment': <locust.env.Environment object at 0x10c426c70>}

 

從上面的監(jiān)控我們可以看到,每次任務(wù)啟動和停止的時候會分別調(diào)用@events.test_start.add_listener@events.test_stop.add_listener裝飾的函數(shù),每次請求發(fā)生的的時候都會調(diào)用@events.request.add_listener 監(jiān)聽器裝飾的函數(shù),我們就是要利用這一點來進行數(shù)據(jù)的上報。

通過查看 Locust 的 EventHook 源碼注釋我們可以看到標準的使用方法:

 

#.../site-packages/locust/event.py
...
class EventHook:
    """
    Simple event class used to provide hooks for different types of events in Locust.

    Here's how to use the EventHook class::

        my_event = EventHook()
        def on_my_event(a, b, **kw):
            print("Event was fired with arguments: %s, %s" % (a, b))
        my_event.add_listener(on_my_event)
        my_event.fire(a="foo", b="bar")

    If reverse is True, then the handlers will run in the reverse order
    that they were inserted
    """
...

 

結(jié)合前面的寫數(shù)據(jù)到 influxDB的實現(xiàn),上報數(shù)據(jù)這一項一下子就變簡單了:

簡單實現(xiàn)每次請求數(shù)據(jù)上報 到 influxDB

下面的代碼運行Locust測試后會自動創(chuàng)建一個locust_requests的 measurement,然后將每次請求的數(shù)據(jù)上報。

 

import time
from datetime import datetime
from influxdb import InfluxDBClient

from locust import HttpUser, task, between, events

client = InfluxDBClient(host='localhost', port=8086, database="mydb")

def request(request_type, name, response_time, response_length, response, context, exception, url, start_time):
    _time = datetime.utcnow()
    was_successful = True
    if response:
        was_successful = 199 < response.status_code < 400
    tags = {
        'request_type': request_type,
        'name': name,
        'success': was_successful,
        'exception': str(exception),
    }
    fields = {
        'response_time': response_time,
        'response_length': response_length,
    }
    data = {"measurement": 'locust_requests', "tags": tags, "time": _time, "fields": fields}
    client.write_points([data])

# 在每次請求的時候通過前面定義的request函數(shù)寫數(shù)據(jù)到 DB
events.request.add_listener(request)


class QuickstartUser(HttpUser):
    wait_time = between(1, 2)

    @task
    def root(self):
        with self.client.get("/", json={"time": time.time()}, catch_response=True) as rsp:
            rsp_json = rsp.json()
            if rsp_json["id"] != 5:
                rsp.failure(f"{rsp_json}")

 

上報的數(shù)據(jù) influxDB 中查詢到:

優(yōu)化升級

上面的這個上報很粗糙,每次請求會上報一次數(shù)據(jù),會影響實際的壓測,如果我們將要上報的數(shù)據(jù)放在一個數(shù)據(jù)結(jié)構(gòu)中中,異步的上報這個數(shù)據(jù)將極大的提升性能

 

# 將 __flush_points 方法中的寫入操作放到一個單獨的線程中,避免阻塞主線程,提高性能。
self.write_thread = threading.Thread(target=self.__write_points_worker)

# 批量寫入
if len(self.write_batch) >= self.batch_size or time.time() - self.last_flush_time >= self.interval_ms / 1000:

# 使用 gzip 壓縮上報的數(shù)據(jù)
influxdb_writer = InfluxDBWriter('localhost', 8086, 'mydb', batch_size=1000, gzip_enabled=True)
...

 

 

配置Grafana

在測試數(shù)據(jù)被上報到InfluxDB之后,可以通過Grafana進行數(shù)據(jù)展示和分析。需要先在Grafana中配置InfluxDB數(shù)據(jù)源,然后創(chuàng)建相應(yīng)的圖表和儀表盤。

在創(chuàng)建圖表和儀表盤時,可以選擇InfluxDB作為數(shù)據(jù)源,并使用InfluxQL查詢語言進行數(shù)據(jù)查詢和過濾??梢愿鶕?jù)需要選擇不同的圖表類型和顯示方式,以展示測試結(jié)果數(shù)據(jù)的趨勢和變化。

 

總結(jié)

本文介紹了如何將Locust測試數(shù)據(jù)上報到InfluxDB,并通過Grafana進行展示和分析。通過將測試數(shù)據(jù)與監(jiān)控工具相結(jié)合,可以更好地了解系統(tǒng)的性能和穩(wěn)定性,及時發(fā)現(xiàn)問題并進行優(yōu)化,也可以方便后續(xù)進行測試數(shù)據(jù)分析。希望本文能對大家有所幫助。

轉(zhuǎn)載自:https://www.cnblogs.com/Detector/p/17538422.html#_label1