-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmonitoring_tool.py
More file actions
94 lines (78 loc) · 3.22 KB
/
monitoring_tool.py
File metadata and controls
94 lines (78 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import time
from fastapi import FastAPI, Request
from prometheus_client import Counter, Histogram, make_asgi_app
from pydantic import BaseModel
# Configuration for monitoring
class MonitoringConfig(BaseModel):
prometheus_endpoint: str = "/metrics"
latency_buckets: list = [0.1, 0.5, 1.0, 2.0, 5.0] # Histogram buckets for latency
# Initialize Prometheus Metrics
class PrometheusMetrics:
def __init__(self, config: MonitoringConfig):
self.REQUEST_COUNT = Counter(
"request_count", "Total number of requests", ["method", "endpoint"]
)
self.REQUEST_LATENCY = Histogram(
"request_latency_seconds",
"Request latency in seconds",
["endpoint"],
buckets=config.latency_buckets,
)
self.ERROR_COUNT = Counter(
"error_count", "Total number of errors", ["method", "endpoint", "status_code"]
)
# Initialize FastAPI and Prometheus
config = MonitoringConfig()
app = FastAPI(title="Hybrid Search Monitoring Tool", version="1.0.0")
metrics = PrometheusMetrics(config)
# Expose Prometheus metrics endpoint
prometheus_app = make_asgi_app()
app.mount(config.prometheus_endpoint, prometheus_app)
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
"""
Middleware to track request metrics including count, latency, and errors.
"""
start_time = time.time()
method = request.method
endpoint = request.url.path
try:
response = await call_next(request)
status_code = response.status_code
# Increment counters and histograms
metrics.REQUEST_COUNT.labels(method=method, endpoint=endpoint).inc()
metrics.REQUEST_LATENCY.labels(endpoint=endpoint).observe(time.time() - start_time)
except Exception as e:
# Increment error count for 500 status codes
metrics.ERROR_COUNT.labels(method=method, endpoint=endpoint, status_code=500).inc()
raise e
return response
@app.get("/")
async def root():
"""
Root endpoint for health check.
"""
return {"message": "Hybrid Search Monitoring Tool is running."}
@app.get("/analytics/usage")
async def get_usage_metrics():
"""
Endpoint to retrieve simulated API usage metrics (example implementation).
"""
return {
"total_requests": metrics.REQUEST_COUNT._value.get(),
"error_count": metrics.ERROR_COUNT._value.get(),
"endpoints_tracked": list(metrics.REQUEST_COUNT._metrics.keys()),
}
# ---------------------------------------------------------------
# What We Did:
# ---------------------------------------------------------------
# - Modularized Prometheus metric definitions using a class.
# - Added configurable histogram buckets for request latency.
# - Improved middleware to match a reusable, extensible format.
# ---------------------------------------------------------------
# What's Next:
# ---------------------------------------------------------------
# - Introduce a logging and alerting service for real-time error tracking.
# - Build a Prometheus PushGateway integration for dynamic environments.
# - Develop a Slack or PagerDuty alerting system for critical thresholds.
# ---------------------------------------------------------------