Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding prometheus dashboard support #1

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 106 additions & 14 deletions raydar/task_tracker/task_tracker.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import asyncio
import coolname
import datetime
import itertools
import logging
import os
import pandas as pd
import polars as pl
import ray
from collections.abc import Iterable
import re
import requests
import time
from collections import defaultdict
from dataclasses import dataclass
from packaging.version import Version
from ray.serve import shutdown
from ray.serve.handle import DeploymentHandle
from typing import Dict, List, Optional
from typing import Dict, Iterable, List, Optional

from .schema import schema as default_schema

Expand Down Expand Up @@ -89,6 +94,7 @@ def __init__(
namespace: str,
path: Optional[str] = None,
enable_perspective_dashboard: bool = False,
scrape_prometheus_metrics: bool = False,
):
"""An async Ray Actor Class to track task level metadata.

Expand Down Expand Up @@ -166,6 +172,98 @@ def __init__(
"error_message": "str",
},
)
if scrape_prometheus_metrics:
self.__scraping_job = self.scrape_prometheus_metrics()

def scrape_prometheus_metrics(self):
"""
Provide a heper method to parse perspective style metrics, a helper dataclass, and launch a job which indefinitely
scrapes the NodeManagerAddress:MetricsExportPort/metrics and updates the appropriate perspective tables with metric values.
"""
from prometheus_client.openmetrics import parser

@dataclass
class ParsedOpenMetricsData:
metric_name: str
metric_description: str
metric_type: str
metric_value: str
metric_metadata: str

def _parse_response(text):
parsed_data = []
metric_name = None
metric_description = None
for line in text.split("\n"):
if len(line) > 0:
if line.startswith("# HELP "):
metric_description = " ".join(line.split(" ")[3:])
elif line.startswith("# TYPE "):
_, _, metric_name, metric_type = line.split(" ")
else:
matches = re.search(r".*\{(.*)\}(.*)", line)
if matches is not None:
metric_metadata, metric_value = matches.groups()
metric_metadata = parser._parse_labels_with_state_machine(metric_metadata)[0]
else:
_, metric_value = line.split(" ")
metric_metadata = dict()
parsed_data.append(
ParsedOpenMetricsData(
metric_name=metric_name,
metric_description=metric_description,
metric_type=metric_type,
metric_value=eval(metric_value),
metric_metadata=metric_metadata,
)
)
return parsed_data

@ray.remote
def _scrape_prometheus_metrics():
"""
Refreshing every os.environ.get("RAYDAR_PROMETHEUS_METRICS_REFRESH_INTERVAL_S", 2) seconds, attempt to connect to the
NodeManagerAddress:MetricsExportPort/metrics endpoint and parse the prometheus metrics provided by this endpoint.
Publish the parsed metrics to the appropriate perspective table (tables are created based on metrics_name).
"""
metrics = set()
while True:
time.sleep(int(os.environ.get("RAYDAR_PROMETHEUS_METRICS_REFRESH_INTERVAL_S", 2)))
for node in ray.nodes():
all_values = defaultdict(list)
if node.get("Alive", False):
response = requests.get(f"http://{node.get('NodeManagerAddress')}:{node.get('MetricsExportPort')}/metrics")
if response.status_code == 200:
parsed_values = _parse_response(response.text)
for parsed_value in parsed_values:
data = dict(
metric_name=parsed_value.metric_name,
metric_description=parsed_value.metric_description,
metric_type=parsed_value.metric_type,
metric_value=parsed_value.metric_value,
timestamp=datetime.datetime.now(),
)
for key, value in parsed_value.metric_metadata.items():
data[key] = value
all_values[parsed_value.metric_name].append(data)

for key, values in all_values.items():
if key not in metrics:
metrics.add(key)
self.proxy_server.remote("new", key, values)
else:
self.proxy_server.remote("update", key, values)

return _scrape_prometheus_metrics.remote()

def get_proxy_server(self):
"""A getter for this actors proxy server attribute. Can be used to create custom perspective visuals.

Returns: this actor's proxy_server attribute
"""
if self.proxy_server:
return self.proxy_server
raise Exception("This task_tracker has no active proxy_server.")

def callback(self, tasks: Iterable[ray.ObjectRef]) -> None:
"""A remote function used by this actor's processor actor attribute. Will be called by a separate actor
Expand Down Expand Up @@ -218,7 +316,9 @@ def update_perspective_dashboard(self, completed_tasks) -> None:
That proxy_server serves perspective tables which anticipate the data formats we provide.

Args:
completed_tasks: A list of tuples of the form (ObjectReference, TaskMetadata), where the ObjectReferences are neither Running nor Pending Assignment.
completed_tasks: A list of tuples of the form (ObjectReference, TaskMetadata), where the
ObjectReferences are neither Running nor Pending Assignment.

"""
data = [
dict(
Expand Down Expand Up @@ -297,14 +397,6 @@ def get_df(self) -> pl.DataFrame:
)
return self.df

def get_proxy_server(self) -> ray.serve.handle.DeploymentHandle:
"""A getter for this actors proxy server attribute. Can be used to create custom perspective visuals.
Returns: this actors proxy_server attribute
"""
if self.proxy_server:
return self.proxy_server
raise Exception("This task_tracker has no active proxy_server.")

def save_df(self) -> None:
"""Saves the internally maintained dataframe of task related information from the ray GCS"""
self.get_df()
Expand All @@ -323,7 +415,7 @@ def clear_df(self) -> None:


class RayTaskTracker:
def __init__(self, name: str = "task_tracker", namespace: str = None, **kwargs):
def __init__(self, name: Optional[str] = "task_tracker", namespace: Optional[str] = None, **kwargs):
"""A utility to construct AsyncMetadataTracker actors.

Wraps several remote AsyncMetadataTracker functions in a ray.get() call for convenience.
Expand Down Expand Up @@ -362,11 +454,11 @@ def get_df(self, process_user_metadata_column=False) -> pl.DataFrame:
return df_with_user_metadata
return df

def save_df(self) -> None:
def save_df(self):
"""Save the dataframe used by this object's AsyncMetadataTracker actor"""
return ray.get(self.tracker.save_df.remote())

def clear(self) -> None:
def clear(self):
"""Clear the dataframe used by this object's AsyncMetadataTracker actor"""
return ray.get(self.tracker.clear_df.remote())

Expand Down
18 changes: 17 additions & 1 deletion raydar/tests/test_task_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def test_construction_and_dataframe(self):
time.sleep(30)
df = task_tracker.get_df()
assert df[["name", "state"]].row(0) == ("do_some_work", "FINISHED")
task_tracker.exit()

def test_get_proxy_server(self):
from raydar.dashboard.server import PerspectiveRayServer
Expand All @@ -37,4 +38,19 @@ def test_get_proxy_server(self):
server.remote("update", "test_table", [dict(a="foo", b=1, c=1.0, d=time.time())])
time.sleep(2)
response = requests.get("http://localhost:8000/tables")
assert eval(response.text) == ["test_table"]
tables = eval(response.text)
assert "test_table" in tables

def test_scrape_prometheus_metrics(self):
task_tracker = RayTaskTracker(
enable_perspective_dashboard=True,
scrape_prometheus_metrics=True,
)
time.sleep(30)
response = requests.get("http://localhost:8000/tables")
tables = eval(response.text)
assert len(tables) > 100
assert "ray_tasks" in tables
assert "ray_actors" in tables
assert "ray_resources" in tables
task_tracker.exit()