"""Provides the various means of collecting machine statistics."""
# © 2023 National Technology & Engineering Solutions of Sandia, LLC
# (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the
# U.S. Government retains certain rights in this software.
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import os
from abc import abstractmethod
from multiprocessing import Manager, Process
from pathlib import Path
from time import sleep, time
from typing import TYPE_CHECKING
from .abstract_method import AbstractMethod
if TYPE_CHECKING:
from multiprocessing.managers import SyncManager
try:
import psutil
except ModuleNotFoundError:
psutil = None
[docs]
def stats_collectors(**kwargs) -> list[StatsCollector]:
"""
Generate stats collectors.
A factory method that returns a list of any subclasses of
:class:`StatsCollector` that have the ``@StatsCollector.subclass``
decorator applied to them.
Parameters:
**kwargs: Any supported arguments of the
:class:`StatsCollector` subclasses.
Returns:
A collection of instances of :class:`StatsCollector` subclasses.
"""
collectors = []
if "measure" in kwargs:
interval = kwargs.get("interval", 1.0)
manager = Manager()
for collector in StatsCollector.subclasses:
if collector.stat_name in kwargs["measure"]:
collectors.append(collector(interval, manager))
return collectors
[docs]
class StatsCollector:
"""
Collect statistics while running command in the shell.
Provides an interface for the :class:`ShellLogger` to run commands
while collecting various system statistics.
"""
stat_name = "undefined" # Should be defined by subclasses.
subclasses = [] # noqa: RUF012
[docs]
@staticmethod
def subclass(stats_collector_subclass: type):
"""
Mark a class as being a supported stats collector.
This is a class decorator that adds to a list of supported
:class:`StatsCollector` classes for the :func:`stats_collectors`
factory method.
"""
if issubclass(stats_collector_subclass, StatsCollector):
StatsCollector.subclasses.append(stats_collector_subclass)
return stats_collector_subclass
def __init__(self, interval: float, _: SyncManager):
"""
Initialize the :class:`StatsCollector` object.
Set the poling interval, and create the process for collecting
the statistics.
Parameters:
interval: How long to sleep between collecting statistics.
Note:
A ``SyncManager`` will be supplied at the second argument
for any subclasses.
"""
self.interval = interval
self.process = Process(target=self.loop, args=())
[docs]
def start(self):
"""
Start a subprocess.
Poll at a certain interval for certain statistics.
"""
self.process.start()
[docs]
def loop(self):
"""
Loop while collecting statistics.
Infinitely loop, collecting statistics, until the subprocess is
terminated.
"""
while True:
self.collect()
sleep(self.interval)
[docs]
@abstractmethod
def collect(self):
"""
Instantaneously collect a statistic.
This is meant to be called repeatedly after some time interval.
Raises:
AbstractMethod: This must be overridden by subclasses.
"""
raise AbstractMethod
[docs]
@abstractmethod
def unproxied_stats(self):
"""
Convert to standard Python data types.
Convert from Python's Manager's data structures to base Python
data structures.
Raises:
AbstractMethod: This must be overridden by subclasses.
"""
raise AbstractMethod
[docs]
def finish(self):
"""
Stop collecting statistics.
Terminate the infinite loop that's collecting the statistics,
and then return the unproxied statistics.
"""
self.process.terminate()
return self.unproxied_stats()
if psutil is not None:
@StatsCollector.subclass
class DiskStatsCollector(StatsCollector):
"""
Collect disk usage statistics.
A means of running commands while collecting disk usage
statistics.
"""
stat_name = "disk"
def __init__(self, interval: float, manager: SyncManager) -> None:
"""
Initialize the :class:`DiskStatsCollector` object.
Parameters:
interval: How many seconds to sleep between polling.
manager: The multiprocessing manager used to control
the process used to collect the statistics.
"""
super().__init__(interval, manager)
self.stats = manager.dict()
self.mount_points = [
p.mountpoint for p in psutil.disk_partitions()
]
for location in [
"/tmp",
"/dev/shm",
f"/var/run/user/{os.getuid()}",
]:
if (
location not in self.mount_points
and Path(location).exists()
):
self.mount_points.append(location)
for m in self.mount_points:
self.stats[m] = manager.list()
def collect(self) -> None:
"""Poll the disks to determine how much free space they have."""
milliseconds_per_second = 10**3
timestamp = round(time() * milliseconds_per_second)
for m in self.mount_points:
self.stats[m].append((timestamp, psutil.disk_usage(m).percent))
def unproxied_stats(self) -> dict:
"""
Convert the statistics to standard Python data types.
Translate the statistics from the multiprocessing
``SyncManager`` 's data structure to a ``dict``.
Returns:
A mapping from the disk mount points to tuples of
timestamps and percent of disk space free.
"""
return {k: list(v) for k, v in self.stats.items()}
@StatsCollector.subclass
class CPUStatsCollector(StatsCollector):
"""
Collect CPU statistics.
A means of running commands while collecting CPU usage
statistics.
"""
stat_name = "cpu"
def __init__(self, interval: float, manager: SyncManager) -> None:
"""
Initialize the :class:`CPUStatsCollector` object.
Parameters:
interval: How many seconds to sleep between polling.
manager: The multiprocessing manager used to control
the process used to collect the statistics.
"""
super().__init__(interval, manager)
self.stats = manager.list()
def collect(self) -> None:
"""Determine how heavily utilized the CPU is at the moment."""
milliseconds_per_second = 10**3
timestamp = round(time() * milliseconds_per_second)
self.stats.append((timestamp, psutil.cpu_percent(interval=None)))
def unproxied_stats(self) -> list[tuple[float, float]]:
"""
Convert the statistics to standard Python data types.
Translate the statistics from the multiprocessing
``SyncManager`` 's data structure to a ``list``.
Returns:
A list of (timestamp, % CPU used) data points.
"""
return list(self.stats)
@StatsCollector.subclass
class MemoryStatsCollector(StatsCollector):
"""
Collect memory statistics.
A means of running commands while collecting memory usage
statistics.
"""
stat_name = "memory"
def __init__(self, interval: float, manager: SyncManager) -> None:
"""
Initialize the :class:`MemoryStatsCollector` object.
Parameters:
interval: How many seconds to sleep between polling.
manager: The multiprocessing manager used to control
the process used to collect the statistics.
"""
super().__init__(interval, manager)
self.stats = manager.list()
def collect(self) -> None:
"""Determine how much memory is currently being used."""
milliseconds_per_second = 10**3
timestamp = round(time() * milliseconds_per_second)
self.stats.append((timestamp, psutil.virtual_memory().percent))
def unproxied_stats(self) -> list[tuple[float, float]]:
"""
Convert the statistics to standard Python data types.
Translate the statistics from the multiprocessing
``SyncManager`` 's data structure to a ``list``.
Returns:
A list of (timestamp, % memory used) data points.
"""
return list(self.stats)
# If we don't have `psutil`, return null objects.
else:
[docs]
@StatsCollector.subclass
class DiskStatsCollector(StatsCollector):
"""
A null disk statistics collector for when data aren't available.
A phony :class:`DiskStatsCollector` used when ``psutil`` is
unavailable. This collects no disk statistics.
"""
stat_name = "disk"
def __init__(self, interval: float, manager: SyncManager) -> None:
"""
Initialize the object via the parent's constructor.
Parameters:
interval: How many seconds to sleep between polling.
manager: The multiprocessing manager used to control
the process used to collect the statistics.
"""
super().__init__(interval, manager)
[docs]
def collect(self) -> None:
"""Don't collect any disk statistics."""
pass
[docs]
def unproxied_stats(self) -> None:
"""
If asked for the disk statistics, don't provide any.
Returns:
None
"""
return
[docs]
@StatsCollector.subclass
class CPUStatsCollector(StatsCollector):
"""
A null CPU statistics collector for when data aren't available.
A phony :class:`CPUStatsCollector` used when ``psutil`` is
unavailable. This collects no CPU statistics.
"""
stat_name = "cpu"
def __init__(self, interval: float, manager: SyncManager) -> None:
"""
Initialize the object via the parent's constructor.
Parameters:
interval: How many seconds to sleep between polling.
manager: The multiprocessing manager used to control
the process used to collect the statistics.
"""
super().__init__(interval, manager)
[docs]
def collect(self) -> None:
"""Don't collect any CPU statistics."""
pass
[docs]
def unproxied_stats(self) -> None:
"""
If asked for CPU statistics, don't provide any.
Returns:
None
"""
return
[docs]
@StatsCollector.subclass
class MemoryStatsCollector(StatsCollector):
"""
A null memory stats collector for when data aren't available.
A phony :class:`MemoryStatsCollector` used when ``psutil`` is
unavailable. This collects no memory statistics.
"""
stat_name = "memory"
def __init__(self, interval: float, manager: SyncManager) -> None:
"""
Initialize the object via the parent's constructor.
Parameters:
interval: How many seconds to sleep between polling.
manager: The multiprocessing manager used to control
the process used to collect the statistics.
"""
super().__init__(interval, manager)
[docs]
def collect(self) -> None:
"""Don't collect any memory statistics."""
pass
[docs]
def unproxied_stats(self) -> None:
"""
If asked for memory statistics, don't provide any.
Returns:
None
"""
return