Python怎样操作Prometheus?prometheus-client

python应用暴露自定义指标到prometheus的核心是使用prometheus-client库,1. 安装库:pip install prometheus_client;2. 定义指标类型:包括counter(计数器,仅增)、gauge(计量器,可增减)、summary(摘要,客户端计算分位数)和histogram(直方图,服务端计算分位数);3. 使用标签(labels)增加维度,但需避免高基数标签(如用户id、请求id)以防止性能问题;4. 通过start_http_server(端口)启动http服务,在/metrics路径暴露指标;5. 对于短生命周期任务,使用pushgateway推送指标而非拉取模式;6. 遵循命名规范(如snake_case、_total结尾、含单位)并提供清晰描述。此外,可构建自定义exporter监控第三方系统,或结合web框架中间件实现自动化指标采集,所有方案均基于prometheus-client实现。

Python怎样操作Prometheus?prometheus-client

python中操作Prometheus,最直接也最常用的方式就是利用官方提供的

prometheus-client

库。它允许你从Python应用程序中轻松地暴露各种指标,供Prometheus服务器进行抓取。这个库提供了创建、更新不同类型指标的API,并且能启动一个简单的HTTP服务器来暴露这些指标。

解决方案

要开始用Python暴露Prometheus指标,首先你需要安装

prometheus-client

库:

pip install prometheus_client

然后,在你的Python应用中,你可以定义不同类型的指标,并更新它们的值。一个典型的设置会是这样:

立即学习Python免费学习笔记(深入)”;

from prometheus_client import start_http_server, Counter, Gauge, Summary, Histogram import random import time  # 定义一个计数器,用于统计请求总数 REQUEST_COUNT = Counter('my_app_requests_total', 'Total number of requests to my application.')  # 定义一个计量器,用于实时显示某个值(比如当前活跃用户数) ACTIVE_USERS = Gauge('my_app_active_users', 'Current number of active users.')  # 定义一个摘要,用于记录请求处理时间,提供分位数信息 REQUEST_LATENCY_SECONDS = Summary('my_app_request_latency_seconds', 'Request latency in seconds.')  # 定义一个直方图,也用于记录请求处理时间,但提供可配置的桶 REQUEST_DURATION_HISTOGRAM = Histogram('my_app_request_duration_seconds', 'Request duration in seconds histogram.')  def process_request():     """模拟一个处理请求的函数"""     REQUEST_COUNT.inc() # 每次调用,计数器加1      # 随机模拟处理时间     processing_time = random.uniform(0.01, 0.5)     time.sleep(processing_time)      # 更新摘要和直方图     REQUEST_LATENCY_SECONDS.observe(processing_time)     REQUEST_DURATION_HISTOGRAM.observe(processing_time)      # 模拟活跃用户数变化     if random.random() < 0.5:         ACTIVE_USERS.inc()     else:         ACTIVE_USERS.dec()  if __name__ == '__main__':     # 启动一个HTTP服务器,在端口8000暴露指标     # 注意:这通常在一个单独的线程中运行,不会阻塞主应用逻辑     start_http_server(8000)     print("Prometheus metrics server started on port 8000")      # 模拟主应用逻辑持续运行     while True:         process_request()         time.sleep(0.1) # 模拟请求间隔

这段代码展示了如何初始化不同类型的指标,并在一个模拟的业务逻辑中更新它们。

start_http_server(8000)

会启动一个轻量级的HTTP服务,在

http://localhost:8000/metrics

路径下暴露这些指标,Prometheus服务器就可以配置来抓取这个端点。

Python应用如何暴露自定义指标到Prometheus?

当我们在谈论Python应用如何暴露自定义指标时,核心其实就是利用

prometheus-client

提供的四种基本指标类型:

Counter

(计数器)、

Gauge

(计量器)、

Summary

(摘要)和

Histogram

(直方图)。理解它们各自的用途是关键。

Counter

是最简单的,它只能增加,不能减少。我通常用它来记录那些只会单向增长的事件,比如总的请求次数、错误发生的次数。一旦定义,你只需要调用

.inc()

(默认加1)或者

.inc(amount)

来增加它的值。我个人觉得,对于任何“总量”性质的统计,

Counter

都是首选。

Gauge

则不同,它能增能减,能设任意值。这让它非常适合表示那些可以上下波动的量,比如当前的CPU使用率、内存占用、队列长度,甚至是活跃的用户数。你可以用

.inc()

.dec()

.set(value)

来操作它。有时候,我甚至会用它来暴露一些配置参数,虽然这不是它的主要设计目的,但确实能帮助调试。

Summary

Histogram

都用于测量事件的持续时间或大小分布。它们之间的区别,我简单理解就是:

Summary

给你的是分位数(比如P99、P95),并且它会计算总和和总数;而

Histogram

则将数据放入预定义的桶中,然后你可以通过这些桶来推断分位数,同时也能计算总和和总数。选择哪个,很大程度上取决于你对数据精度的要求以及后续查询的习惯。

Summary

在客户端计算分位数,对于高基数(label组合多)的场景,可能会消耗更多客户端资源。

Histogram

则在服务器端(Prometheus)计算分位数,通过

histogram_quantile

函数,这在处理大量数据时通常更灵活,也更适合聚合。对我而言,如果对精确分位数要求高,或者数据范围变化大,我更倾向于使用

Histogram

,因为它提供了更细粒度的控制,可以自定义桶的边界。

在实际操作中,指标的命名规范也非常重要。Prometheus社区有一些约定俗成的规则,比如使用

snake_case

,以

_total

结尾表示计数器,包含单位(如

_seconds

_bytes

)。良好的命名习惯不仅让你的指标更易读,也能让其他人更容易理解你的系统状态。

from prometheus_client import Counter, Gauge, Summary, Histogram, generate_latest from http.server import BaseHTTPRequestHandler, HTTPServer import time import random  # 带标签的计数器,可以按不同的HTTP方法统计 HTTP_REQUESTS_TOTAL = Counter(     'http_requests_total',     'Total HTTP requests received',     ['method', 'endpoint'] # 定义标签 )  # 计量器,用于表示一个瞬时值,例如缓存命中率 CACHE_HIT_RATIO = Gauge('cache_hit_ratio', 'Current cache hit ratio')  # 直方图,用于记录API响应时间,并自定义桶 API_RESPONSE_TIME_SECONDS = Histogram(     'api_response_time_seconds',     'API response time in seconds',     buckets=[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, float('inf')] )  class MetricsHandler(BaseHTTPRequestHandler):     def do_GET(self):         if self.path == '/metrics':             self.send_response(200)             self.send_header('Content-type', 'text/plain; version=0.0.4; charset=utf-8')             self.end_headers()             self.wfile.write(generate_latest()) # 获取所有注册的指标数据         else:             self.send_response(404)             self.end_headers()  def run_mock_api():     """模拟一个带指标更新的API服务"""     while True:         method = random.choice(['GET', 'POST', 'PUT'])         endpoint = random.choice(['/users', '/products', '/orders'])          # 更新计数器,带标签         HTTP_REQUESTS_TOTAL.labels(method=method, endpoint=endpoint).inc()          # 模拟API处理时间         response_time = random.uniform(0.01, 1.0)         API_RESPONSE_TIME_SECONDS.observe(response_time)          # 随机更新缓存命中率         CACHE_HIT_RATIO.set(random.uniform(0.7, 0.99))          time.sleep(random.uniform(0.1, 0.5))  if __name__ == '__main__':     # 在单独的线程中启动HTTP服务器     from threading import Thread     metrics_server_thread = Thread(target=lambda: HTTPServer(('0.0.0.0', 8000), MetricsHandler).serve_forever())     metrics_server_thread.daemon = True # 设置为守护线程,主程序退出时自动退出     metrics_server_thread.start()     print("Metrics server running on http://localhost:8000/metrics")      # 运行模拟API     run_mock_api() 

这段代码展示了如何使用标签(labels)来增加指标的维度,这在Prometheus中非常强大,可以让你根据不同的维度(比如HTTP方法、API端点)来切分和聚合数据。同时,也展示了如何自定义

Histogram

的桶,以及手动设置HTTP服务器来暴露指标,这在某些更复杂的场景下可能会用到。

采集Python应用指标时有哪些常见陷阱和优化建议?

在使用

prometheus-client

来暴露Python应用的指标时,确实会遇到一些小坑,或者说,有一些地方需要我们特别注意,才能让监控系统既高效又实用。我个人在实践中总结了几点:

首先,最常见的陷阱就是高基数(High Cardinality)问题。如果你在指标的标签中使用了像用户ID、请求ID、会话ID或者时间戳这类非常动态且唯一的值,那么Prometheus在存储和查询这些指标时会面临巨大的压力。每一个独特的标签组合都会被视为一个新的时间序列。想象一下,如果你的应用每秒处理数千个请求,每个请求都有一个唯一的ID作为标签,那Prometheus的存储空间很快就会爆炸,查询也会变得异常缓慢。我记得有一次,我们不小心把一个日志ID加到了某个核心指标的标签里,结果不到半天,Prometheus的磁盘使用率就飙升了。所以,我的建议是:对标签的设计要非常谨慎,只使用那些具有业务意义且基数有限的维度,比如HTTP方法、状态码、API路径(但要避免包含动态参数的路径)、服务版本、部署区域等。

其次,是关于指标更新的频率与性能。虽然

prometheus-client

本身设计得很高效,但如果你在非常热的代码路径(比如每个微秒都在执行的循环里)频繁地更新大量指标,这仍然可能对应用本身的性能造成轻微影响。通常,这并不是一个大问题,因为Python的GIL(全局解释器锁)会限制并发,而且指标更新操作本身很快。但如果你的应用对延迟极其敏感,或者指标数量真的非常庞大,你可能需要考虑批量更新,或者只在关键路径上进行更新。不过,说实话,对于大多数Web应用和后台服务,这种性能开销通常可以忽略不计。

再来就是短生命周期任务的指标暴露

prometheus-client

默认是启动一个HTTP服务器,等待Prometheus来抓取。但如果你的python脚本是一个短生命周期的任务(比如一个定时脚本,运行几秒钟就结束了),Prometheus可能还没来得及抓取,你的脚本就已经退出了。这种情况下,传统的拉取模式就不太适用。解决方案通常是使用Prometheus Pushgateway。你的Python脚本可以将指标推送到Pushgateway,然后Prometheus从Pushgateway抓取。这为短生命周期任务提供了一个非常优雅的监控方式。

from prometheus_client import push_to_gateway, Counter import random import time  # 定义一个计数器 BATCH_JOB_RUNS_TOTAL = Counter('batch_job_runs_total', 'Total runs of the batch job.') BATCH_JOB_FAILURES_TOTAL = Counter('batch_job_failures_total', 'Total failures of the batch job.')  def run_batch_job():     """模拟一个批处理任务"""     BATCH_JOB_RUNS_TOTAL.inc()     print("Batch job started...")      # 模拟任务执行时间     time.sleep(random.uniform(1, 5))      if random.random() < 0.2: # 20% 失败率         BATCH_JOB_FAILURES_TOTAL.inc()         print("Batch job failed!")     else:         print("Batch job completed successfully.")      # 任务结束后,将指标推送到Pushgateway     # job参数是必须的,用于标识这个批处理任务     # 可以添加分组键(groupingkey)来进一步区分,比如按实例ID     push_to_gateway('localhost:9091', job='my_python_batch_job', grouping_key={'instance': 'my_server_1'}, registry=None)     print("Metrics pushed to Pushgateway.")  if __name__ == '__main__':     # 假设Pushgateway运行在localhost:9091     run_batch_job()

这段代码展示了如何使用

push_to_gateway

将指标推送到Pushgateway。

registry=None

表示推送所有默认注册的指标。

job

参数是Prometheus用来识别这个任务的名称,

grouping_key

则可以让你为同一

job

下的指标添加额外的维度,比如区分不同的实例。

最后,一个小的优化点是,确保你的指标描述(docstring)清晰明了。这不仅方便你自己,也方便团队里的其他人理解这些指标的含义。一个好的指标描述能省去很多沟通成本。

除了标准库,还有哪些方法可以与Prometheus集成?

当提到Python与Prometheus的集成,

prometheus-client

无疑是官方且最直接的方式。但“除了标准库”这个说法,其实可以理解为除了

prometheus-client

这个“标准”的Python客户端库之外,还有没有其他思路或工具。从这个角度来看,答案是肯定的,主要可以归纳为以下几类:

首先,是Prometheus Pushgateway,前面已经提到了。它并不是一个替代

prometheus-client

的方案,而是

prometheus-client

的一种补充用法。当你的Python应用是短生命周期的脚本、批处理任务或者无法被Prometheus直接抓取(例如在严格防火墙后)时,Pushgateway就显得尤为重要。你的python程序不再是等待被拉取,而是主动将指标“推”给Pushgateway,由Pushgateway再暴露给Prometheus抓取。这对于那些跑完就退出的任务,或者需要聚合来自多个短暂实例的指标,是非常实用的。

其次,是自定义Prometheus Exporter。虽然

prometheus-client

可以让你在Python应用内部暴露指标,但有时候你可能需要监控一个非Python服务,或者一个没有内置Prometheus指标暴露能力的第三方系统。在这种情况下,你可以用Python编写一个独立的程序,作为该服务的“Prometheus Exporter”。这个Exporter会连接到目标服务(比如数据库、消息队列、操作系统API等),获取其内部状态数据,然后将这些数据转换成Prometheus可识别的指标格式,并通过自身的HTTP端口暴露出来。这个Exporter本身通常也会使用

prometheus-client

来暴露它自己的内部运行状态指标(比如它处理了多少次请求,连接目标服务花了多长时间等)。这种方式的优点是高度灵活,可以监控任何你能通过Python访问到的数据源。

举个例子,假设你想监控一个没有Prometheus接口的遗留系统。你可以写一个Python脚本:

  1. 连接到那个遗留系统(可能是通过ssh、REST API、数据库查询等)。
  2. 解析获取到的数据。
  3. 使用
    prometheus-client

    将这些数据转换为Prometheus指标。

  4. 启动一个HTTP服务器,暴露这些指标。
from prometheus_client import Gauge, start_http_server import time import random  # 假设这是从某个遗留系统获取到的数据 LEGACY_SYSTEM_STATUS = Gauge('legacy_system_status', 'Status of the legacy system (1=ok, 0=error)') LEGACY_SYSTEM_QUEUE_DEPTH = Gauge('legacy_system_queue_depth', 'Current depth of the legacy system queue')  def fetch_legacy_data():     """模拟从遗留系统获取数据"""     # 实际场景中,这里会是调用API、SSH命令或数据库查询     status = 1 if random.random() > 0.1 else 0 # 10% 几率出错     queue_depth = random.randint(0, 100)     return status, queue_depth  def run_exporter():     while True:         status, queue_depth = fetch_legacy_data()         LEGACY_SYSTEM_STATUS.set(status)         LEGACY_SYSTEM_QUEUE_DEPTH.set(queue_depth)         print(f"Fetched legacy data: Status={status}, QueueDepth={queue_depth}")         time.sleep(10) # 每10秒更新一次  if __name__ == '__main__':     start_http_server(8001) # Exporter在8001端口暴露     print("Legacy system exporter started on port 8001")     run_exporter()

这个简单的例子展示了如何构建一个独立的Exporter。它定期从一个模拟的“遗留系统”获取数据,并将其转换为Prometheus指标。

最后,一些高级框架或库的集成。虽然这本质上还是基于

prometheus-client

,但它们可能提供更高级别的抽象,让你在特定的Python Web框架(如flask, django, fastapi)中更方便地集成指标。例如,有些库会提供中间件,自动帮你记录HTTP请求的计数、延迟等,而无需你手动在每个路由中添加代码。这些库通常会封装

prometheus-client

的调用,提供更“开箱即用”的体验。它们并非完全不同的集成方式,而是让

prometheus-client

在特定生态系统中用起来更顺手。但归根结底,核心逻辑依然是

prometheus-client

在背后默默工作。

© 版权声明
THE END
喜欢就支持一下吧
点赞6 分享