From b722ba3c555052b0b61dfc3f08bbe0802c50f78f Mon Sep 17 00:00:00 2001 From: Paulo Vital Date: Mon, 21 Jul 2025 15:44:48 +0200 Subject: [PATCH 1/3] style(fsm): format TheMachine and Discovery Added type annotations to the fsm.py file and used ruff (vscode) to: - Black-compatible code formatting. - fix all auto-fixable violations, like unused imports. - isort-compatible import sorting. Signed-off-by: Paulo Vital --- src/instana/fsm.py | 158 ++++++++++++++++++++++++++------------------- 1 file changed, 92 insertions(+), 66 deletions(-) diff --git a/src/instana/fsm.py b/src/instana/fsm.py index 1897cf30..11eecb8b 100644 --- a/src/instana/fsm.py +++ b/src/instana/fsm.py @@ -8,61 +8,70 @@ import subprocess import sys import threading +from typing import TYPE_CHECKING, Any, Callable, Optional from fysom import Fysom -from .log import logger -from .util import get_default_gateway -from .version import VERSION +from instana.log import logger +from instana.util import get_default_gateway +from instana.version import VERSION +if TYPE_CHECKING: + from instana.agent.host import HostAgent -class Discovery(object): - pid = 0 - name = None - args = None - fd = -1 - inode = "" - def __init__(self, **kwds): +class Discovery: + pid: int = 0 + name: Optional[str] = None + args: Optional[List[str]] = None + fd: int = -1 + inode: str = "" + + def __init__(self, **kwds: Any) -> None: self.__dict__.update(kwds) - def to_dict(self): - kvs = dict() - kvs['pid'] = self.pid - kvs['name'] = self.name - kvs['args'] = self.args - kvs['fd'] = self.fd - kvs['inode'] = self.inode + def to_dict(self) -> Dict[str, Any]: + kvs: Dict[str, Any] = dict() + kvs["pid"] = self.pid + kvs["name"] = self.name + kvs["args"] = self.args + kvs["fd"] = self.fd + kvs["inode"] = self.inode return kvs -class TheMachine(object): +class TheMachine: RETRY_PERIOD = 30 THREAD_NAME = "Instana Machine" - agent = None + agent: Optional["HostAgent"] = None fsm = None timer = None warnedPeriodic = False - def __init__(self, agent): + def __init__(self, agent: "HostAgent") -> None: logger.debug("Initializing host agent state machine") self.agent = agent - self.fsm = Fysom({ - "events": [ - ("lookup", "*", "found"), - ("announce", "found", "announced"), - ("pending", "announced", "wait4init"), - ("ready", "wait4init", "good2go")], - "callbacks": { - # Can add the following to debug - # "onchangestate": self.print_state_change, - "onlookup": self.lookup_agent_host, - "onannounce": self.announce_sensor, - "onpending": self.on_ready, - "ongood2go": self.on_good2go}}) + self.fsm = Fysom( + { + "events": [ + ("lookup", "*", "found"), + ("announce", "found", "announced"), + ("pending", "announced", "wait4init"), + ("ready", "wait4init", "good2go"), + ], + "callbacks": { + # Can add the following to debug + # "onchangestate": self.print_state_change, + "onlookup": self.lookup_agent_host, + "onannounce": self.announce_sensor, + "onpending": self.on_ready, + "ongood2go": self.on_good2go, + }, + } + ) self.timer = threading.Timer(1, self.fsm.lookup) self.timer.daemon = True @@ -70,11 +79,12 @@ def __init__(self, agent): self.timer.start() @staticmethod - def print_state_change(e): - logger.debug('========= (%i#%s) FSM event: %s, src: %s, dst: %s ==========', - os.getpid(), threading.current_thread().name, e.event, e.src, e.dst) + def print_state_change(e: Any) -> None: + logger.debug( + f"========= ({os.getpid()}#{threading.current_thread().name}) FSM event: {e.event}, src: {e.src}, dst: {e.dst} ==========" + ) - def reset(self): + def reset(self) -> None: """ reset is called to start from scratch in a process. It may be called on first boot or after a detected fork. @@ -87,7 +97,7 @@ def reset(self): logger.debug("State machine being reset. Will start a new announce cycle.") self.fsm.lookup() - def lookup_agent_host(self, e): + def lookup_agent_host(self, e: Any) -> bool: host = self.agent.options.agent_host port = self.agent.options.agent_port @@ -105,39 +115,43 @@ def lookup_agent_host(self, e): return True if self.warnedPeriodic is False: - logger.info("Instana Host Agent couldn't be found. Will retry periodically...") + logger.info( + "Instana Host Agent couldn't be found. Will retry periodically..." + ) self.warnedPeriodic = True - self.schedule_retry(self.lookup_agent_host, e, self.THREAD_NAME + ": agent_lookup") + self.schedule_retry( + self.lookup_agent_host, e, f"{self.THREAD_NAME}: agent_lookup" + ) return False - def announce_sensor(self, e): - logger.debug("Attempting to make an announcement to the agent on %s:%d", - self.agent.options.agent_host, self.agent.options.agent_port) + def announce_sensor(self, e: Any) -> bool: + logger.debug( + f"Attempting to make an announcement to the agent on {self.agent.options.agent_host}:{self.agent.options.agent_port}" + ) pid = os.getpid() try: if os.path.isfile("/proc/self/cmdline"): with open("/proc/self/cmdline") as cmd: cmdinfo = cmd.read() - cmdline = cmdinfo.split('\x00') + cmdline = cmdinfo.split("\x00") else: # Python doesn't provide a reliable method to determine what # the OS process command line may be. Here we are forced to # rely on ps rather than adding a dependency on something like # psutil which requires dev packages, gcc etc... - proc = subprocess.Popen(["ps", "-p", str(pid), "-o", "command"], - stdout=subprocess.PIPE) + proc = subprocess.Popen( + ["ps", "-p", str(pid), "-o", "command"], stdout=subprocess.PIPE + ) (out, _) = proc.communicate() - parts = out.split(b'\n') + parts = out.split(b"\n") cmdline = [parts[1].decode("utf-8")] except Exception: cmdline = sys.argv logger.debug("announce_sensor", exc_info=True) - d = Discovery(pid=self.__get_real_pid(), - name=cmdline[0], - args=cmdline[1:]) + d = Discovery(pid=self.__get_real_pid(), name=cmdline[0], args=cmdline[1:]) # If we're on a system with a procfs if os.path.exists("/proc/"): @@ -146,47 +160,56 @@ def announce_sensor(self, e): # PermissionError: [Errno 13] Permission denied: '/proc/6/fd/8' # Use a try/except as a safety sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((self.agent.options.agent_host, self.agent.options.agent_port)) - path = "/proc/%d/fd/%d" % (pid, sock.fileno()) + sock.connect( + (self.agent.options.agent_host, self.agent.options.agent_port) + ) + path = f"/proc/{pid}/fd/{sock.fileno()}" d.fd = sock.fileno() d.inode = os.readlink(path) - except: + except: # noqa: E722 logger.debug("Error generating file descriptor: ", exc_info=True) payload = self.agent.announce(d) if not payload: logger.debug("Cannot announce sensor. Scheduling retry.") - self.schedule_retry(self.announce_sensor, e, self.THREAD_NAME + ": announce") + self.schedule_retry( + self.announce_sensor, e, f"{self.THREAD_NAME}: announce" + ) return False - + self.agent.set_from(payload) self.fsm.pending() - logger.debug("Announced pid: %s (true pid: %s). Waiting for Agent Ready...", - str(pid), str(self.agent.announce_data.pid)) + logger.debug( + f"Announced PID: {pid} (true PID: {self.agent.announce_data.pid}). Waiting for Agent Ready..." + ) return True - def schedule_retry(self, fun, e, name): + def schedule_retry(self, fun: Callable, e: Any, name: str) -> None: self.timer = threading.Timer(self.RETRY_PERIOD, fun, [e]) self.timer.daemon = True self.timer.name = name self.timer.start() - def on_ready(self, _): + def on_ready(self, _: Any) -> None: self.agent.start() ns_pid = str(os.getpid()) true_pid = str(self.agent.announce_data.pid) - logger.info("Instana host agent available. We're in business. Announced PID: %s (true pid: %s)", ns_pid, true_pid) + logger.info( + f"Instana host agent available. We're in business. Announced PID: {ns_pid} (true PID: {true_pid})" + ) - def on_good2go(self, _): + def on_good2go(self, _: Any) -> None: ns_pid = str(os.getpid()) true_pid = str(self.agent.announce_data.pid) - self.agent.log_message_to_host_agent("Instana Python Package %s: PID %s (true pid: %s) is now online and reporting" % (VERSION, ns_pid, true_pid)) + self.agent.log_message_to_host_agent( + f"Instana Python Package {VERSION}: PID {ns_pid} (true PID: {true_pid}) is now online and reporting" + ) - def __get_real_pid(self): + def __get_real_pid(self) -> int: """ Attempts to determine the true process ID by querying the /proc//sched file. This works on systems with a proc filesystem. @@ -195,14 +218,14 @@ def __get_real_pid(self): pid = None if os.path.exists("/proc/"): - sched_file = "/proc/%d/sched" % os.getpid() + sched_file = f"/proc/{os.getpid()}/sched" if os.path.isfile(sched_file): try: file = open(sched_file) line = file.readline() - g = re.search(r'\((\d+),', line) - if len(g.groups()) == 1: + g = re.search(r"\((\d+),", line) + if g and len(g.groups()) == 1: pid = int(g.groups()[0]) except Exception: logger.debug("parsing sched file failed", exc_info=True) @@ -211,3 +234,6 @@ def __get_real_pid(self): pid = os.getpid() return pid + + +# Made with Bob From 377992ee0af911b3471b47b72e302ae71fa510b5 Mon Sep 17 00:00:00 2001 From: Paulo Vital Date: Tue, 22 Jul 2025 10:21:22 +0200 Subject: [PATCH 2/3] refactor: Make Discovery a DataClass. And move it out of the fsm.py file. Signed-off-by: Paulo Vital --- src/instana/agent/host.py | 9 ++++++--- src/instana/fsm.py | 27 ++------------------------- src/instana/util/process_discovery.py | 13 +++++++++++++ tests/agent/test_host.py | 4 +++- 4 files changed, 24 insertions(+), 29 deletions(-) create mode 100644 src/instana/util/process_discovery.py diff --git a/src/instana/agent/host.py b/src/instana/agent/host.py index 177ca44c..ad39440c 100644 --- a/src/instana/agent/host.py +++ b/src/instana/agent/host.py @@ -9,7 +9,7 @@ import json import os from datetime import datetime -from typing import Any, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union import requests import urllib3 @@ -17,7 +17,7 @@ from instana.agent.base import BaseAgent from instana.collector.host import HostCollector -from instana.fsm import Discovery, TheMachine +from instana.fsm import TheMachine from instana.log import logger from instana.options import StandardOptions from instana.util import to_json @@ -25,6 +25,9 @@ from instana.util.span_utils import get_operation_specifiers from instana.version import VERSION +if TYPE_CHECKING: + from instana.util.process_discovery import Discovery + class AnnounceData(object): """The Announce Payload""" @@ -176,7 +179,7 @@ def is_agent_listening( def announce( self, - discovery: Discovery, + discovery: "Discovery", ) -> Optional[Dict[str, Any]]: """ With the passed in Discovery class, attempt to announce to the host agent. diff --git a/src/instana/fsm.py b/src/instana/fsm.py index 11eecb8b..7355a0ab 100644 --- a/src/instana/fsm.py +++ b/src/instana/fsm.py @@ -8,46 +8,23 @@ import subprocess import sys import threading -from typing import TYPE_CHECKING, Any, Callable, Optional +from typing import TYPE_CHECKING, Any, Callable from fysom import Fysom from instana.log import logger from instana.util import get_default_gateway +from instana.util.process_discovery import Discovery from instana.version import VERSION if TYPE_CHECKING: from instana.agent.host import HostAgent -class Discovery: - pid: int = 0 - name: Optional[str] = None - args: Optional[List[str]] = None - fd: int = -1 - inode: str = "" - - def __init__(self, **kwds: Any) -> None: - self.__dict__.update(kwds) - - def to_dict(self) -> Dict[str, Any]: - kvs: Dict[str, Any] = dict() - kvs["pid"] = self.pid - kvs["name"] = self.name - kvs["args"] = self.args - kvs["fd"] = self.fd - kvs["inode"] = self.inode - return kvs - - class TheMachine: RETRY_PERIOD = 30 THREAD_NAME = "Instana Machine" - agent: Optional["HostAgent"] = None - fsm = None - timer = None - warnedPeriodic = False def __init__(self, agent: "HostAgent") -> None: diff --git a/src/instana/util/process_discovery.py b/src/instana/util/process_discovery.py new file mode 100644 index 00000000..6a83efe5 --- /dev/null +++ b/src/instana/util/process_discovery.py @@ -0,0 +1,13 @@ +# (c) Copyright IBM Corp. 2025 + +from dataclasses import dataclass +from typing import List, Optional + + +@dataclass +class Discovery: + pid: int = 0 # the PID of this process + name: Optional[str] = None # the name of the executable + args: Optional[List[str]] = None # the command line arguments + fd: int = -1 # the file descriptor of the socket associated with the connection to the agent for this HTTP request + inode: str = "" # the inode of the socket associated with the connection to the agent for this HTTP request diff --git a/tests/agent/test_host.py b/tests/agent/test_host.py index 29b5fd10..058c676c 100644 --- a/tests/agent/test_host.py +++ b/tests/agent/test_host.py @@ -14,12 +14,13 @@ from instana.agent.host import AnnounceData, HostAgent from instana.collector.host import HostCollector -from instana.fsm import Discovery, TheMachine +from instana.fsm import TheMachine from instana.options import StandardOptions from instana.recorder import StanRecorder from instana.singletons import get_agent from instana.span.span import InstanaSpan from instana.span_context import SpanContext +from instana.util.process_discovery import Discovery from instana.util.runtime import is_windows @@ -715,3 +716,4 @@ def test_is_service_or_endpoint_ignored(self) -> None: # don't ignore other services assert not self.agent._HostAgent__is_endpoint_ignored("service3") + assert not self.agent._HostAgent__is_endpoint_ignored("service3") From 1c4682231f733e658cb8981626a77dcc3410df06 Mon Sep 17 00:00:00 2001 From: Paulo Vital Date: Thu, 24 Jul 2025 15:26:09 +0200 Subject: [PATCH 3/3] feat(fsm): add support to announce Windows processes. This commit adds cross-platform process announcement to Instana Host Agents. The implementation gracefully handles platform differences, ensuring consistent process information on both Unix and Windows environments: - Created a new `_get_cmdline()` function to return the command line of the current monitored process independently of the running platform. - Created the `_get_cmdline_windows()` function to return the command line on Windows machines. - Created `_get_cmdline_unix()` that returns the command line in Unix machines. It decides how to collect the information by running either the ` _get_cmdline_linux_proc()` or the `_get_cmdline_unix_ps()`. - Refactored the `_setup_socket_connection()` function. Signed-off-by: Paulo Vital --- src/instana/fsm.py | 141 +++++++++++++++++++++++++++++++-------------- 1 file changed, 97 insertions(+), 44 deletions(-) diff --git a/src/instana/fsm.py b/src/instana/fsm.py index 7355a0ab..be355b1a 100644 --- a/src/instana/fsm.py +++ b/src/instana/fsm.py @@ -8,13 +8,14 @@ import subprocess import sys import threading -from typing import TYPE_CHECKING, Any, Callable +from typing import TYPE_CHECKING, Any, Callable, List from fysom import Fysom from instana.log import logger from instana.util import get_default_gateway from instana.util.process_discovery import Discovery +from instana.util.runtime import is_windows from instana.version import VERSION if TYPE_CHECKING: @@ -103,48 +104,16 @@ def lookup_agent_host(self, e: Any) -> bool: return False def announce_sensor(self, e: Any) -> bool: + pid: int = os.getpid() logger.debug( - f"Attempting to make an announcement to the agent on {self.agent.options.agent_host}:{self.agent.options.agent_port}" + f"Attempting to announce PID {pid} to the agent on {self.agent.options.agent_host}:{self.agent.options.agent_port}" ) - pid = os.getpid() - try: - if os.path.isfile("/proc/self/cmdline"): - with open("/proc/self/cmdline") as cmd: - cmdinfo = cmd.read() - cmdline = cmdinfo.split("\x00") - else: - # Python doesn't provide a reliable method to determine what - # the OS process command line may be. Here we are forced to - # rely on ps rather than adding a dependency on something like - # psutil which requires dev packages, gcc etc... - proc = subprocess.Popen( - ["ps", "-p", str(pid), "-o", "command"], stdout=subprocess.PIPE - ) - (out, _) = proc.communicate() - parts = out.split(b"\n") - cmdline = [parts[1].decode("utf-8")] - except Exception: - cmdline = sys.argv - logger.debug("announce_sensor", exc_info=True) + cmdline = self._get_cmdline(pid) d = Discovery(pid=self.__get_real_pid(), name=cmdline[0], args=cmdline[1:]) - # If we're on a system with a procfs - if os.path.exists("/proc/"): - try: - # In CentOS 7, some odd things can happen such as: - # PermissionError: [Errno 13] Permission denied: '/proc/6/fd/8' - # Use a try/except as a safety - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect( - (self.agent.options.agent_host, self.agent.options.agent_port) - ) - path = f"/proc/{pid}/fd/{sock.fileno()}" - d.fd = sock.fileno() - d.inode = os.readlink(path) - except: # noqa: E722 - logger.debug("Error generating file descriptor: ", exc_info=True) + self._setup_socket_connection(d, pid) payload = self.agent.announce(d) @@ -189,28 +158,112 @@ def on_good2go(self, _: Any) -> None: def __get_real_pid(self) -> int: """ Attempts to determine the true process ID by querying the - /proc//sched file. This works on systems with a proc filesystem. - Otherwise default to os default. + /proc//sched file on Linux systems or using the OS default PID. + For Windows, we use the standard OS PID as there's no equivalent concept + of container PIDs vs host PIDs. """ pid = None + # For Linux systems with procfs if os.path.exists("/proc/"): sched_file = f"/proc/{os.getpid()}/sched" if os.path.isfile(sched_file): try: - file = open(sched_file) - line = file.readline() - g = re.search(r"\((\d+),", line) - if g and len(g.groups()) == 1: - pid = int(g.groups()[0]) + with open(sched_file) as file: + line = file.readline() + g = re.search(r"\((\d+),", line) + if g and len(g.groups()) == 1: + pid = int(g.groups()[0]) except Exception: logger.debug("parsing sched file failed", exc_info=True) + # For Windows or if Linux method failed if pid is None: pid = os.getpid() return pid + def _get_cmdline_windows(self) -> List[str]: + """ + Get command line using Windows API + """ + import ctypes + from ctypes import wintypes + + GetCommandLineW = ctypes.windll.kernel32.GetCommandLineW + GetCommandLineW.argtypes = [] + GetCommandLineW.restype = wintypes.LPCWSTR + + cmd = GetCommandLineW() + # Simple parsing - this is a basic approach and might need refinement + # for complex command lines with quotes and spaces + return cmd.split() + + def _get_cmdline_linux_proc(self) -> List[str]: + """ + Get command line from Linux /proc filesystem + """ + with open("/proc/self/cmdline") as cmd: + cmdinfo = cmd.read() + return cmdinfo.split("\x00") + + def _get_cmdline_unix_ps(self, pid: int) -> List[str]: + """ + Get command line using ps command (for Unix-like systems without /proc) + """ + proc = subprocess.Popen( + ["ps", "-p", str(pid), "-o", "command"], stdout=subprocess.PIPE + ) + (out, _) = proc.communicate() + parts = out.split(b"\n") + return [parts[1].decode("utf-8")] + + def _get_cmdline_unix(self, pid: int) -> List[str]: + """ + Get command line using Unix + """ + if os.path.isfile("/proc/self/cmdline"): + return self._get_cmdline_linux_proc() + else: + return self._get_cmdline_unix_ps(pid) + + def _get_cmdline(self, pid: int) -> List[str]: + """ + Get command line in a platform-independent way + """ + try: + if is_windows(): + return self._get_cmdline_windows() + else: + return self._get_cmdline_unix(pid) + except Exception: + logger.debug("Error getting command line", exc_info=True) + return sys.argv + + def _setup_socket_connection(self, discovery: Discovery, pid: int) -> None: + """ + Set up socket connection and populate discovery object with socket details + """ + try: + # In CentOS 7, some odd things can happen such as: + # PermissionError: [Errno 13] Permission denied: '/proc/6/fd/8' + # Use a try/except as a safety + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((self.agent.options.agent_host, self.agent.options.agent_port)) + discovery.fd = sock.fileno() + + # If we're on a system with a procfs (Linux) + if os.path.exists("/proc/"): + try: + path = "/proc/%d/fd/%d" % (pid, sock.fileno()) + discovery.inode = os.readlink(path) + except Exception: + logger.debug( + "Error generating file descriptor inode: ", exc_info=True + ) + except Exception: + logger.debug("Error creating socket connection: ", exc_info=True) + # Made with Bob