Skip to content

Fix announce of processes on Windows. #767

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/instana/agent/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,25 @@
import json
import os
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

import requests
import urllib3
from requests import Response

from instana.agent.base import BaseAgent
from instana.collector.host import HostCollector
from instana.fsm import Discovery, TheMachine
from instana.fsm import TheMachine
from instana.log import logger
from instana.options import StandardOptions
from instana.util import to_json
from instana.util.runtime import get_py_source, log_runtime_env_info
from instana.util.span_utils import get_operation_specifiers
from instana.version import VERSION

if TYPE_CHECKING:
from instana.util.process_discovery import Discovery


class AnnounceData(object):
"""The Announce Payload"""
Expand Down Expand Up @@ -176,7 +179,7 @@ def is_agent_listening(

def announce(
self,
discovery: Discovery,
discovery: "Discovery",
) -> Optional[Dict[str, Any]]:
"""
With the passed in Discovery class, attempt to announce to the host agent.
Expand Down
264 changes: 160 additions & 104 deletions src/instana/fsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,73 +8,61 @@
import subprocess
import sys
import threading
from typing import TYPE_CHECKING, Any, Callable, List

from fysom import Fysom

from .log import logger
from .util import get_default_gateway
from .version import VERSION
from instana.log import logger
from instana.util import get_default_gateway
from instana.util.process_discovery import Discovery
from instana.util.runtime import is_windows
from instana.version import VERSION

if TYPE_CHECKING:
from instana.agent.host import HostAgent

class Discovery(object):
pid = 0
name = None
args = None
fd = -1
inode = ""

def __init__(self, **kwds):
self.__dict__.update(kwds)

def to_dict(self):
kvs = dict()
kvs['pid'] = self.pid
kvs['name'] = self.name
kvs['args'] = self.args
kvs['fd'] = self.fd
kvs['inode'] = self.inode
return kvs


class TheMachine(object):
class TheMachine:
RETRY_PERIOD = 30
THREAD_NAME = "Instana Machine"

agent = None
fsm = None
timer = None

warnedPeriodic = False

def __init__(self, agent):
def __init__(self, agent: "HostAgent") -> None:
logger.debug("Initializing host agent state machine")

self.agent = agent
self.fsm = Fysom({
"events": [
("lookup", "*", "found"),
("announce", "found", "announced"),
("pending", "announced", "wait4init"),
("ready", "wait4init", "good2go")],
"callbacks": {
# Can add the following to debug
# "onchangestate": self.print_state_change,
"onlookup": self.lookup_agent_host,
"onannounce": self.announce_sensor,
"onpending": self.on_ready,
"ongood2go": self.on_good2go}})
self.fsm = Fysom(
{
"events": [
("lookup", "*", "found"),
("announce", "found", "announced"),
("pending", "announced", "wait4init"),
("ready", "wait4init", "good2go"),
],
"callbacks": {
# Can add the following to debug
# "onchangestate": self.print_state_change,
"onlookup": self.lookup_agent_host,
"onannounce": self.announce_sensor,
"onpending": self.on_ready,
"ongood2go": self.on_good2go,
},
}
)

self.timer = threading.Timer(1, self.fsm.lookup)
self.timer.daemon = True
self.timer.name = self.THREAD_NAME
self.timer.start()

@staticmethod
def print_state_change(e):
logger.debug('========= (%i#%s) FSM event: %s, src: %s, dst: %s ==========',
os.getpid(), threading.current_thread().name, e.event, e.src, e.dst)
def print_state_change(e: Any) -> None:
logger.debug(
f"========= ({os.getpid()}#{threading.current_thread().name}) FSM event: {e.event}, src: {e.src}, dst: {e.dst} =========="
)

def reset(self):
def reset(self) -> None:
"""
reset is called to start from scratch in a process. It may be called on first boot or
after a detected fork.
Expand All @@ -87,7 +75,7 @@ def reset(self):
logger.debug("State machine being reset. Will start a new announce cycle.")
self.fsm.lookup()

def lookup_agent_host(self, e):
def lookup_agent_host(self, e: Any) -> bool:
host = self.agent.options.agent_host
port = self.agent.options.agent_port

Expand All @@ -105,109 +93,177 @@ def lookup_agent_host(self, e):
return True

if self.warnedPeriodic is False:
logger.info("Instana Host Agent couldn't be found. Will retry periodically...")
logger.info(
"Instana Host Agent couldn't be found. Will retry periodically..."
)
self.warnedPeriodic = True

self.schedule_retry(self.lookup_agent_host, e, self.THREAD_NAME + ": agent_lookup")
self.schedule_retry(
self.lookup_agent_host, e, f"{self.THREAD_NAME}: agent_lookup"
)
return False

def announce_sensor(self, e):
logger.debug("Attempting to make an announcement to the agent on %s:%d",
self.agent.options.agent_host, self.agent.options.agent_port)
pid = os.getpid()
def announce_sensor(self, e: Any) -> bool:
pid: int = os.getpid()
logger.debug(
f"Attempting to announce PID {pid} to the agent on {self.agent.options.agent_host}:{self.agent.options.agent_port}"
)

try:
if os.path.isfile("/proc/self/cmdline"):
with open("/proc/self/cmdline") as cmd:
cmdinfo = cmd.read()
cmdline = cmdinfo.split('\x00')
else:
# Python doesn't provide a reliable method to determine what
# the OS process command line may be. Here we are forced to
# rely on ps rather than adding a dependency on something like
# psutil which requires dev packages, gcc etc...
proc = subprocess.Popen(["ps", "-p", str(pid), "-o", "command"],
stdout=subprocess.PIPE)
(out, _) = proc.communicate()
parts = out.split(b'\n')
cmdline = [parts[1].decode("utf-8")]
except Exception:
cmdline = sys.argv
logger.debug("announce_sensor", exc_info=True)
cmdline = self._get_cmdline(pid)

d = Discovery(pid=self.__get_real_pid(),
name=cmdline[0],
args=cmdline[1:])
d = Discovery(pid=self.__get_real_pid(), name=cmdline[0], args=cmdline[1:])

# If we're on a system with a procfs
if os.path.exists("/proc/"):
try:
# In CentOS 7, some odd things can happen such as:
# PermissionError: [Errno 13] Permission denied: '/proc/6/fd/8'
# Use a try/except as a safety
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.agent.options.agent_host, self.agent.options.agent_port))
path = "/proc/%d/fd/%d" % (pid, sock.fileno())
d.fd = sock.fileno()
d.inode = os.readlink(path)
except:
logger.debug("Error generating file descriptor: ", exc_info=True)
self._setup_socket_connection(d, pid)

payload = self.agent.announce(d)

if not payload:
logger.debug("Cannot announce sensor. Scheduling retry.")
self.schedule_retry(self.announce_sensor, e, self.THREAD_NAME + ": announce")
self.schedule_retry(
self.announce_sensor, e, f"{self.THREAD_NAME}: announce"
)
return False

self.agent.set_from(payload)
self.fsm.pending()
logger.debug("Announced pid: %s (true pid: %s). Waiting for Agent Ready...",
str(pid), str(self.agent.announce_data.pid))
logger.debug(
f"Announced PID: {pid} (true PID: {self.agent.announce_data.pid}). Waiting for Agent Ready..."
)
return True

def schedule_retry(self, fun, e, name):
def schedule_retry(self, fun: Callable, e: Any, name: str) -> None:
self.timer = threading.Timer(self.RETRY_PERIOD, fun, [e])
self.timer.daemon = True
self.timer.name = name
self.timer.start()

def on_ready(self, _):
def on_ready(self, _: Any) -> None:
self.agent.start()

ns_pid = str(os.getpid())
true_pid = str(self.agent.announce_data.pid)

logger.info("Instana host agent available. We're in business. Announced PID: %s (true pid: %s)", ns_pid, true_pid)
logger.info(
f"Instana host agent available. We're in business. Announced PID: {ns_pid} (true PID: {true_pid})"
)

def on_good2go(self, _):
def on_good2go(self, _: Any) -> None:
ns_pid = str(os.getpid())
true_pid = str(self.agent.announce_data.pid)

self.agent.log_message_to_host_agent("Instana Python Package %s: PID %s (true pid: %s) is now online and reporting" % (VERSION, ns_pid, true_pid))
self.agent.log_message_to_host_agent(
f"Instana Python Package {VERSION}: PID {ns_pid} (true PID: {true_pid}) is now online and reporting"
)

def __get_real_pid(self):
def __get_real_pid(self) -> int:
"""
Attempts to determine the true process ID by querying the
/proc/<pid>/sched file. This works on systems with a proc filesystem.
Otherwise default to os default.
/proc/<pid>/sched file on Linux systems or using the OS default PID.
For Windows, we use the standard OS PID as there's no equivalent concept
of container PIDs vs host PIDs.
"""
pid = None

# For Linux systems with procfs
if os.path.exists("/proc/"):
sched_file = "/proc/%d/sched" % os.getpid()
sched_file = f"/proc/{os.getpid()}/sched"

if os.path.isfile(sched_file):
try:
file = open(sched_file)
line = file.readline()
g = re.search(r'\((\d+),', line)
if len(g.groups()) == 1:
pid = int(g.groups()[0])
with open(sched_file) as file:
line = file.readline()
g = re.search(r"\((\d+),", line)
if g and len(g.groups()) == 1:
pid = int(g.groups()[0])
except Exception:
logger.debug("parsing sched file failed", exc_info=True)

# For Windows or if Linux method failed
if pid is None:
pid = os.getpid()

return pid

def _get_cmdline_windows(self) -> List[str]:
"""
Get command line using Windows API
"""
import ctypes
from ctypes import wintypes

GetCommandLineW = ctypes.windll.kernel32.GetCommandLineW
GetCommandLineW.argtypes = []
GetCommandLineW.restype = wintypes.LPCWSTR

cmd = GetCommandLineW()
# Simple parsing - this is a basic approach and might need refinement
# for complex command lines with quotes and spaces
return cmd.split()

def _get_cmdline_linux_proc(self) -> List[str]:
"""
Get command line from Linux /proc filesystem
"""
with open("/proc/self/cmdline") as cmd:
cmdinfo = cmd.read()
return cmdinfo.split("\x00")

def _get_cmdline_unix_ps(self, pid: int) -> List[str]:
"""
Get command line using ps command (for Unix-like systems without /proc)
"""
proc = subprocess.Popen(
["ps", "-p", str(pid), "-o", "command"], stdout=subprocess.PIPE
)
(out, _) = proc.communicate()
parts = out.split(b"\n")
return [parts[1].decode("utf-8")]

def _get_cmdline_unix(self, pid: int) -> List[str]:
"""
Get command line using Unix
"""
if os.path.isfile("/proc/self/cmdline"):
return self._get_cmdline_linux_proc()
else:
return self._get_cmdline_unix_ps(pid)

def _get_cmdline(self, pid: int) -> List[str]:
"""
Get command line in a platform-independent way
"""
try:
if is_windows():
return self._get_cmdline_windows()
else:
return self._get_cmdline_unix(pid)
except Exception:
logger.debug("Error getting command line", exc_info=True)
return sys.argv

def _setup_socket_connection(self, discovery: Discovery, pid: int) -> None:
"""
Set up socket connection and populate discovery object with socket details
"""
try:
# In CentOS 7, some odd things can happen such as:
# PermissionError: [Errno 13] Permission denied: '/proc/6/fd/8'
# Use a try/except as a safety
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.agent.options.agent_host, self.agent.options.agent_port))
discovery.fd = sock.fileno()

# If we're on a system with a procfs (Linux)
if os.path.exists("/proc/"):
try:
path = "/proc/%d/fd/%d" % (pid, sock.fileno())
discovery.inode = os.readlink(path)
except Exception:
logger.debug(
"Error generating file descriptor inode: ", exc_info=True
)
except Exception:
logger.debug("Error creating socket connection: ", exc_info=True)


# Made with Bob
Loading
Loading