nailgun-client/py/ng.py (771 lines of code) (raw):
#!/usr/bin/env python
#
# Copyright 2004-2015, Martian Software, Inc.
# Copyright 2017-Present Facebook, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import ctypes
import platform
import optparse
import os
import os.path
import select
import socket
import struct
import sys
import time
from io import BytesIO
from threading import Condition, Event, Thread, RLock
is_py2 = sys.version_info[0] == 2
if is_py2:
import Queue as Queue
import __builtin__ as builtin
def to_bytes(s):
return s
else:
import queue as Queue
import builtins as builtin
from io import UnsupportedOperation
def to_bytes(s):
return bytes(s, "utf-8")
if sys.platform == "win32":
import os, msvcrt
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY)
# @author <a href="http://www.martiansoftware.com/contact.html">Marty Lamb</a>
# @author Pete Kirkham (Win32 port)
# @author Sergey Balabanov, Ben Hamilton (Python port)
#
# Please try to keep this working on Python 2.6.
NAILGUN_VERSION = "1.0.0"
BUFSIZE = 2048
NAILGUN_PORT_DEFAULT = 2113
CHUNK_HEADER_LEN = 5
THREAD_TERMINATION_TIMEOUT_SEC = 0.5
STDIN_BUFFER_LINE_SIZE = 10
CHUNKTYPE_STDIN = b"0"
CHUNKTYPE_STDOUT = b"1"
CHUNKTYPE_STDERR = b"2"
CHUNKTYPE_STDIN_EOF = b"."
CHUNKTYPE_ARG = b"A"
CHUNKTYPE_LONGARG = b"L"
CHUNKTYPE_ENV = b"E"
CHUNKTYPE_DIR = b"D"
CHUNKTYPE_CMD = b"C"
CHUNKTYPE_EXIT = b"X"
CHUNKTYPE_SENDINPUT = b"S"
CHUNKTYPE_HEARTBEAT = b"H"
NSEC_PER_SEC = 1000000000
DEFAULT_HEARTBEAT_INTERVAL_SEC = 0.5
SELECT_MAX_BLOCK_TIME_SEC = 1.0
SEND_THREAD_WAIT_TERMINATION_SEC = 5.0
# We need to support Python 2.6 hosts which lack memoryview().
HAS_MEMORYVIEW = "memoryview" in dir(builtin)
EVENT_STDIN_CHUNK = 0
EVENT_STDIN_CLOSED = 1
EVENT_STDIN_EXCEPTION = 2
def compat_memoryview_py2(buf):
return memoryview(buf)
def compat_memoryview_py3(buf):
return memoryview(buf).cast("c")
# memoryview in python3, while wrapping ctypes.create_string_buffer has problems with
# that type's default format (<c) and assignment operators. For python3, cast to
# a 'c' array. Little endian single byte doesn't make sense anyways. However,
# 'cast' does not exist for python2. So, we have to toggle a bit.
compat_memoryview = compat_memoryview_py2 if is_py2 else compat_memoryview_py3
class NailgunException(Exception):
SOCKET_FAILED = 231
CONNECT_FAILED = 230
UNEXPECTED_CHUNKTYPE = 229
CONNECTION_BROKEN = 227
def __init__(self, message, code):
self.message = message
self.code = code
def __str__(self):
return self.message
class Transport(object):
def close(self):
raise NotImplementedError()
def sendall(self, data):
raise NotImplementedError()
def recv(self, size):
raise NotImplementedError()
def recv_into(self, buffer, size=None):
raise NotImplementedError()
def select(self, timeout_secs):
raise NotImplementedError()
class UnixTransport(Transport):
def __init__(self, __socket):
self.__socket = __socket
self.recv_flags = 0
self.send_flags = 0
if hasattr(socket, "MSG_WAITALL"):
self.recv_flags |= socket.MSG_WAITALL
if hasattr(socket, "MSG_NOSIGNAL"):
self.send_flags |= socket.MSG_NOSIGNAL
def close(self):
return self.__socket.close()
def sendall(self, data):
result = self.__socket.sendall(data, self.send_flags)
return result
def recv(self, nbytes):
return self.__socket.recv(nbytes, self.recv_flags)
def recv_into(self, buffer, nbytes=None):
return self.__socket.recv_into(buffer, nbytes, self.recv_flags)
def select(self, timeout_secs):
select_list = [self.__socket]
readable, _, exceptional = select.select(
select_list, [], select_list, timeout_secs
)
return (self.__socket in readable), (self.__socket in exceptional)
if os.name == "nt":
import ctypes.wintypes
wintypes = ctypes.wintypes
GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000
FILE_FLAG_OVERLAPPED = 0x40000000
OPEN_EXISTING = 3
INVALID_HANDLE_VALUE = ctypes.c_void_p(-1).value
FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
WAIT_FAILED = 0xFFFFFFFF
WAIT_TIMEOUT = 0x00000102
WAIT_OBJECT_0 = 0x00000000
WAIT_IO_COMPLETION = 0x000000C0
INFINITE = 0xFFFFFFFF
# Overlapped I/O operation is in progress. (997)
ERROR_IO_PENDING = 0x000003E5
ERROR_PIPE_BUSY = 231
# No process is on the other end of the pipe error on Windows
ERROR_NO_PROCESS_ON_OTHER_END_OF_PIPE = 233
# The pointer size follows the architecture
# We use WPARAM since this type is already conditionally defined
ULONG_PTR = ctypes.wintypes.WPARAM
class OVERLAPPED(ctypes.Structure):
_fields_ = [
("Internal", ULONG_PTR),
("InternalHigh", ULONG_PTR),
("Offset", wintypes.DWORD),
("OffsetHigh", wintypes.DWORD),
("hEvent", wintypes.HANDLE),
]
LPDWORD = ctypes.POINTER(wintypes.DWORD)
CreateFile = ctypes.windll.kernel32.CreateFileW
CreateFile.argtypes = [
wintypes.LPCWSTR,
wintypes.DWORD,
wintypes.DWORD,
wintypes.LPVOID,
wintypes.DWORD,
wintypes.DWORD,
wintypes.HANDLE,
]
CreateFile.restype = wintypes.HANDLE
CloseHandle = ctypes.windll.kernel32.CloseHandle
CloseHandle.argtypes = [wintypes.HANDLE]
CloseHandle.restype = wintypes.BOOL
ReadFile = ctypes.windll.kernel32.ReadFile
ReadFile.argtypes = [
wintypes.HANDLE,
wintypes.LPVOID,
wintypes.DWORD,
LPDWORD,
ctypes.POINTER(OVERLAPPED),
]
ReadFile.restype = wintypes.BOOL
WriteFile = ctypes.windll.kernel32.WriteFile
WriteFile.argtypes = [
wintypes.HANDLE,
wintypes.LPVOID,
wintypes.DWORD,
LPDWORD,
ctypes.POINTER(OVERLAPPED),
]
WriteFile.restype = wintypes.BOOL
GetLastError = ctypes.windll.kernel32.GetLastError
GetLastError.argtypes = []
GetLastError.restype = wintypes.DWORD
SetLastError = ctypes.windll.kernel32.SetLastError
SetLastError.argtypes = [wintypes.DWORD]
SetLastError.restype = None
FormatMessage = ctypes.windll.kernel32.FormatMessageW
FormatMessage.argtypes = [
wintypes.DWORD,
wintypes.LPVOID,
wintypes.DWORD,
wintypes.DWORD,
ctypes.POINTER(wintypes.LPCWSTR),
wintypes.DWORD,
wintypes.LPVOID,
]
FormatMessage.restype = wintypes.DWORD
LocalFree = ctypes.windll.kernel32.LocalFree
GetOverlappedResult = ctypes.windll.kernel32.GetOverlappedResult
GetOverlappedResult.argtypes = [
wintypes.HANDLE,
ctypes.POINTER(OVERLAPPED),
LPDWORD,
wintypes.BOOL,
]
GetOverlappedResult.restype = wintypes.BOOL
CreateEvent = ctypes.windll.kernel32.CreateEventW
CreateEvent.argtypes = [LPDWORD, wintypes.BOOL, wintypes.BOOL, wintypes.LPCWSTR]
CreateEvent.restype = wintypes.HANDLE
PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
PeekNamedPipe.argtypes = [
wintypes.HANDLE,
wintypes.LPVOID,
wintypes.DWORD,
LPDWORD,
LPDWORD,
LPDWORD,
]
PeekNamedPipe.restype = wintypes.BOOL
WaitNamedPipe = ctypes.windll.kernel32.WaitNamedPipeW
WaitNamedPipe.argtypes = [wintypes.LPCWSTR, wintypes.DWORD]
WaitNamedPipe.restype = wintypes.BOOL
def _win32_strerror(err):
""" expand a win32 error code into a human readable message """
# FormatMessage will allocate memory and assign it here
buf = ctypes.c_wchar_p()
FormatMessage(
FORMAT_MESSAGE_FROM_SYSTEM
| FORMAT_MESSAGE_ALLOCATE_BUFFER
| FORMAT_MESSAGE_IGNORE_INSERTS,
None,
err,
0,
buf,
0,
None,
)
try:
return buf.value
finally:
LocalFree(buf)
class WindowsNamedPipeTransport(Transport):
""" connect to a named pipe """
def __init__(self, sockpath):
self.sockpath = u"\\\\.\\pipe\\{0}".format(sockpath)
while True:
self.pipe = CreateFile(
self.sockpath,
GENERIC_READ | GENERIC_WRITE,
0,
None,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
None,
)
err1 = GetLastError()
msg = _win32_strerror(err1)
if self.pipe != INVALID_HANDLE_VALUE:
break
if err1 != ERROR_PIPE_BUSY:
self.pipe = None
raise NailgunException(msg, NailgunException.CONNECT_FAILED)
if not WaitNamedPipe(self.sockpath, 5000):
self.pipe = None
raise NailgunException(
"time out while waiting for a pipe", NailgunException.CONNECT_FAILED
)
# event for the overlapped I/O operations
self.read_waitable = CreateEvent(None, True, False, None)
if self.read_waitable is None:
raise NailgunException(
"CreateEvent failed", NailgunException.CONNECT_FAILED
)
self.write_waitable = CreateEvent(None, True, False, None)
if self.write_waitable is None:
raise NailgunException(
"CreateEvent failed", NailgunException.CONNECT_FAILED
)
def _raise_win_err(self, msg, err):
raise IOError("%s win32 error code: %d %s" % (msg, err, _win32_strerror(err)))
def close(self):
if self.pipe:
CloseHandle(self.pipe)
self.pipe = None
if self.read_waitable is not None:
CloseHandle(self.read_waitable)
self.read_waitable = None
if self.write_waitable is not None:
CloseHandle(self.write_waitable)
self.write_waitable = None
def recv_into(self, buffer, nbytes):
# we don't use memoryview because OVERLAPPED I/O happens
# after the method (ReadFile) returns
buf = ctypes.create_string_buffer(nbytes)
olap = OVERLAPPED()
olap.hEvent = self.read_waitable
immediate = ReadFile(self.pipe, buf, nbytes, None, olap)
err = GetLastError()
if err == ERROR_NO_PROCESS_ON_OTHER_END_OF_PIPE:
raise NailgunException(
"No process on the other end of pipe",
NailgunException.CONNECTION_BROKEN,
)
if not immediate:
if err != ERROR_IO_PENDING:
self._raise_win_err("failed to read %d bytes" % nbytes, GetLastError())
nread = wintypes.DWORD()
if not GetOverlappedResult(self.pipe, olap, nread, True):
err = GetLastError()
self._raise_win_err("error while waiting for read", err)
nread = nread.value
if not is_py2:
# Wrap in a memoryview, as python3 does not let you assign from a
# ctypes.c_char_array slice directly to a memory view, as one is 'c', and one
# is '<c' struct/buffer proto format.
buf = compat_memoryview(buf)
buffer[:nread] = buf[:nread]
return nread
def sendall(self, data):
olap = OVERLAPPED()
olap.hEvent = self.write_waitable
p = (ctypes.c_ubyte * len(data))(*(bytearray(data)))
immediate = WriteFile(self.pipe, p, len(data), None, olap)
if not immediate:
err = GetLastError()
if err != ERROR_IO_PENDING:
self._raise_win_err(
"failed to write %d bytes" % len(data), GetLastError()
)
# Obtain results, waiting if needed
nwrote = wintypes.DWORD()
if not GetOverlappedResult(self.pipe, olap, nwrote, True):
err = GetLastError()
self._raise_win_err("error while waiting for write", err)
nwrote = nwrote.value
if nwrote != len(data):
raise IOError("Async wrote less bytes!")
return nwrote
def select(self, timeout_secs):
start = monotonic_time_nanos()
timeout_nanos = timeout_secs * NSEC_PER_SEC
while True:
readable, exceptional = self.select_now()
if (
readable
or exceptional
or monotonic_time_nanos() - start > timeout_nanos
):
return readable, exceptional
# Sleep a bit to avoid busy looping for no reason.
time.sleep(0.05)
def select_now(self):
available_total = wintypes.DWORD()
exceptional = not PeekNamedPipe(self.pipe, None, 0, None, available_total, None)
readable = available_total.value > 0
result = readable, exceptional
return result
class NailgunConnection(object):
"""Stateful object holding the connection to the Nailgun server."""
def __init__(
self,
server_name,
server_port=None,
stdin=sys.stdin,
stdout=sys.stdout,
stderr=sys.stderr,
cwd=None,
heartbeat_interval_sec=DEFAULT_HEARTBEAT_INTERVAL_SEC,
):
self.transport = make_nailgun_transport(server_name, server_port, cwd)
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.recv_flags = 0
self.send_flags = 0
self.header_buf = ctypes.create_string_buffer(CHUNK_HEADER_LEN)
self.buf = ctypes.create_string_buffer(BUFSIZE)
self.exit_code = None
self.shutdown_event = Event()
self.error_lock = RLock()
self.error = None
self.error_traceback = None
self.stdin_condition = Condition()
self.stdin_thread = Thread(target=stdin_thread_main, args=(self,))
self.stdin_thread.daemon = True
self.send_queue = Queue.Queue()
self.send_condition = Condition()
self.send_thread = Thread(target=send_thread_main, args=(self,))
self.send_thread.daemon = True
self.heartbeat_interval_sec = heartbeat_interval_sec
self.heartbeat_condition = Condition()
self.heartbeat_thread = None
if heartbeat_interval_sec > 0:
self.heartbeat_thread = Thread(target=heartbeat_thread_main, args=(self,))
self.heartbeat_thread.daemon = True
def send_command(
self, cmd, cmd_args=[], filearg=None, env=os.environ, cwd=os.getcwd()
):
"""
Sends the command and environment to the nailgun server, then loops forever
reading the response until the server sends an exit chunk.
Returns the exit value, or raises NailgunException on error.
"""
try:
return self._send_command_and_read_response(
cmd, cmd_args, filearg, env, cwd
)
except socket.error as e:
re_raise(
NailgunException(
"Server disconnected unexpectedly: {0}".format(e),
NailgunException.CONNECTION_BROKEN,
)
)
def _send_command_and_read_response(self, cmd, cmd_args, filearg, env, cwd):
self.stdin_thread.start()
self.send_thread.start()
try:
if filearg:
self._send_file_arg(filearg)
for cmd_arg in cmd_args:
self._send_chunk(cmd_arg, CHUNKTYPE_ARG)
self._send_env_var("NAILGUN_FILESEPARATOR", os.sep)
self._send_env_var("NAILGUN_PATHSEPARATOR", os.pathsep)
self._send_tty_format(self.stdin)
self._send_tty_format(self.stdout)
self._send_tty_format(self.stderr)
for k, v in env.items():
self._send_env_var(k, v)
self._send_chunk(cwd, CHUNKTYPE_DIR)
self._send_chunk(cmd, CHUNKTYPE_CMD)
if self.heartbeat_thread is not None:
self.heartbeat_thread.start()
while self.exit_code is None:
self._process_next_chunk()
finally:
self.shutdown_event.set()
with self.stdin_condition:
self.stdin_condition.notify()
with self.send_condition:
self.send_condition.notify()
if self.heartbeat_thread is not None:
with self.heartbeat_condition:
self.heartbeat_condition.notify()
self.heartbeat_thread.join(THREAD_TERMINATION_TIMEOUT_SEC)
self.stdin_thread.join(THREAD_TERMINATION_TIMEOUT_SEC)
self.send_thread.join(THREAD_TERMINATION_TIMEOUT_SEC)
return self.exit_code
def _process_next_chunk(self):
"""
Processes the next chunk from the nailgun server.
"""
readable, exceptional = self.transport.select(SELECT_MAX_BLOCK_TIME_SEC)
if readable:
self._process_nailgun_stream()
if exceptional:
raise NailgunException(
"Server disconnected in select", NailgunException.CONNECTION_BROKEN
)
# if daemon thread threw, rethrow here
if self.shutdown_event.is_set():
e = None
e_tb = None
with self.error_lock:
e = self.error
e_tb = self.error_traceback
if e is not None:
re_raise(e, e_tb)
def _send_chunk(self, buf, chunk_type):
"""
Send chunk to the server asynchronously
"""
self.send_queue.put((chunk_type, buf))
with self.send_condition:
self.send_condition.notify()
def _send_env_var(self, name, value):
"""
Sends an environment variable in KEY=VALUE format.
"""
self._send_chunk("=".join((name, value)), CHUNKTYPE_ENV)
def _send_tty_format(self, f):
"""
Sends a NAILGUN_TTY_# environment variable.
"""
if not f or not hasattr(f, "fileno") or isinstance(f, BytesIO):
return
try:
fileno = f.fileno()
isatty = os.isatty(fileno)
self._send_env_var("NAILGUN_TTY_" + str(fileno), str(int(isatty)))
except UnsupportedOperation:
return
def _send_file_arg(self, filename):
"""
Sends the contents of a file to the server.
"""
with open(filename) as f:
while True:
num_bytes = f.readinto(self.buf)
if not num_bytes:
break
self._send_chunk(self.buf.raw[:num_bytes], CHUNKTYPE_LONGARG)
def _recv_to_fd(self, dest_file, num_bytes):
"""
Receives num_bytes bytes from the nailgun socket and copies them to the specified file
object. Used to route data to stdout or stderr on the client.
"""
bytes_read = 0
dest_fd = dest_file
flush = False
if dest_file and hasattr(dest_file, 'buffer'):
dest_fd = dest_file.buffer
flush = True
# Make sure we've written anything that already existed in the buffer
dest_fd.flush()
while bytes_read < num_bytes:
bytes_to_read = min(len(self.buf), num_bytes - bytes_read)
bytes_received = self.transport.recv_into(self.buf, bytes_to_read)
if dest_fd:
dest_fd.write(self.buf[:bytes_received])
if flush:
dest_fd.flush()
bytes_read += bytes_received
def _recv_to_buffer(self, num_bytes, buf):
"""
Receives num_bytes from the nailgun socket and writes them into the specified buffer.
"""
# We'd love to use socket.recv_into() everywhere to avoid
# unnecessary copies, but we need to support Python 2.6. The
# only way to provide an offset to recv_into() is to use
# memoryview(), which doesn't exist until Python 2.7.
if HAS_MEMORYVIEW:
self._recv_into_memoryview(num_bytes, compat_memoryview(buf))
else:
self._recv_to_buffer_with_copy(num_bytes, buf)
def _recv_into_memoryview(self, num_bytes, buf_view):
"""
Receives num_bytes from the nailgun socket and writes them into the specified memoryview
to avoid an extra copy.
"""
bytes_read = 0
while bytes_read < num_bytes:
bytes_received = self.transport.recv_into(
buf_view[bytes_read:], num_bytes - bytes_read
)
if not bytes_received:
raise NailgunException(
"Server unexpectedly disconnected in recv_into()",
NailgunException.CONNECTION_BROKEN,
)
bytes_read += bytes_received
def _recv_to_buffer_with_copy(self, num_bytes, buf):
"""
Receives num_bytes from the nailgun socket and writes them into the specified buffer.
"""
bytes_read = 0
while bytes_read < num_bytes:
recv_buf = self.transport.recv(num_bytes - bytes_read)
if not len(recv_buf):
raise NailgunException(
"Server unexpectedly disconnected in recv()",
NailgunException.CONNECTION_BROKEN,
)
buf[bytes_read : bytes_read + len(recv_buf)] = recv_buf
bytes_read += len(recv_buf)
def _process_exit(self, exit_len):
"""
Receives an exit code from the nailgun server and sets nailgun_connection.exit_code
to indicate the client should exit.
"""
num_bytes = min(len(self.buf), exit_len)
self._recv_to_buffer(num_bytes, self.buf)
self.exit_code = int(self.buf.raw[:num_bytes])
def _send_heartbeat(self):
"""
Sends a heartbeat to the nailgun server to indicate the client is still alive.
"""
self._send_chunk("", CHUNKTYPE_HEARTBEAT)
def _process_nailgun_stream(self):
"""
Processes a single chunk from the nailgun server.
"""
self._recv_to_buffer(len(self.header_buf), self.header_buf)
(chunk_len, chunk_type) = struct.unpack_from(">ic", self.header_buf.raw)
if chunk_type == CHUNKTYPE_STDOUT:
self._recv_to_fd(self.stdout, chunk_len)
elif chunk_type == CHUNKTYPE_STDERR:
self._recv_to_fd(self.stderr, chunk_len)
elif chunk_type == CHUNKTYPE_EXIT:
self._process_exit(chunk_len)
elif chunk_type == CHUNKTYPE_SENDINPUT:
# signal stdin thread to get and send more data
with self.stdin_condition:
self.stdin_condition.notify()
else:
raise NailgunException(
"Unexpected chunk type: {0}".format(chunk_type),
NailgunException.UNEXPECTED_CHUNKTYPE,
)
def wait_termination(self, timeout):
"""
Wait for shutdown event to be signalled within specified interval
Return True if termination was signalled, False otherwise
"""
wait_time = timeout
start = monotonic_time_nanos()
with self.send_condition:
while True:
if self.shutdown_event.is_set():
return True
self.send_condition.wait(wait_time)
elapsed = (monotonic_time_nanos() - start) * 1.0 / NSEC_PER_SEC
wait_time = timeout - elapsed
if wait_time <= 0:
return False
return False
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
try:
self.transport.close()
except socket.error:
pass
def monotonic_time_nanos():
"""Returns a monotonically-increasing timestamp value in nanoseconds.
The epoch of the return value is undefined. To use this, you must call
it more than once and calculate the delta between two calls.
"""
# This function should be overwritten below on supported platforms.
raise Exception("Unsupported platform: " + platform.system())
if platform.system() == "Linux":
# From <linux/time.h>, available since 2.6.28 (released 24-Dec-2008).
CLOCK_MONOTONIC_RAW = 4
librt = ctypes.CDLL("librt.so.1", use_errno=True)
clock_gettime = librt.clock_gettime
class struct_timespec(ctypes.Structure):
_fields_ = [("tv_sec", ctypes.c_long), ("tv_nsec", ctypes.c_long)]
clock_gettime.argtypes = [ctypes.c_int, ctypes.POINTER(struct_timespec)]
def _monotonic_time_nanos_linux():
t = struct_timespec()
clock_gettime(CLOCK_MONOTONIC_RAW, ctypes.byref(t))
return t.tv_sec * NSEC_PER_SEC + t.tv_nsec
monotonic_time_nanos = _monotonic_time_nanos_linux
elif platform.system() == "Darwin":
# From <mach/mach_time.h>
KERN_SUCCESS = 0
libSystem = ctypes.CDLL("/usr/lib/libSystem.dylib", use_errno=True)
mach_timebase_info = libSystem.mach_timebase_info
class struct_mach_timebase_info(ctypes.Structure):
_fields_ = [("numer", ctypes.c_uint32), ("denom", ctypes.c_uint32)]
mach_timebase_info.argtypes = [ctypes.POINTER(struct_mach_timebase_info)]
mach_ti = struct_mach_timebase_info()
ret = mach_timebase_info(ctypes.byref(mach_ti))
if ret != KERN_SUCCESS:
raise Exception("Could not get mach_timebase_info, error: " + str(ret))
mach_absolute_time = libSystem.mach_absolute_time
mach_absolute_time.restype = ctypes.c_uint64
def _monotonic_time_nanos_darwin():
return (mach_absolute_time() * mach_ti.numer) / mach_ti.denom
monotonic_time_nanos = _monotonic_time_nanos_darwin
elif platform.system() == "Windows":
# From <Winbase.h>
perf_frequency = ctypes.c_uint64()
ctypes.windll.kernel32.QueryPerformanceFrequency(ctypes.byref(perf_frequency))
def _monotonic_time_nanos_windows():
perf_counter = ctypes.c_uint64()
ctypes.windll.kernel32.QueryPerformanceCounter(ctypes.byref(perf_counter))
return perf_counter.value * NSEC_PER_SEC / perf_frequency.value
monotonic_time_nanos = _monotonic_time_nanos_windows
elif sys.platform == "cygwin":
k32 = ctypes.CDLL("Kernel32", use_errno=True)
perf_frequency = ctypes.c_uint64()
k32.QueryPerformanceFrequency(ctypes.byref(perf_frequency))
def _monotonic_time_nanos_cygwin():
perf_counter = ctypes.c_uint64()
k32.QueryPerformanceCounter(ctypes.byref(perf_counter))
return perf_counter.value * NSEC_PER_SEC / perf_frequency.value
monotonic_time_nanos = _monotonic_time_nanos_cygwin
def send_thread_main(conn):
"""
Sending thread worker function
Waits for data and transmits it to server
"""
try:
header_buf = ctypes.create_string_buffer(CHUNK_HEADER_LEN)
while True:
connection_error = None
while not conn.send_queue.empty():
# only this thread can deplete the queue, so it is safe to use blocking get()
(chunk_type, buf) = conn.send_queue.get()
bbuf = to_bytes(buf)
struct.pack_into(">ic", header_buf, 0, len(bbuf), chunk_type)
# these chunk types are not required for server to accept and process and server may terminate
# any time without waiting for them
is_required = chunk_type not in (
CHUNKTYPE_HEARTBEAT,
CHUNKTYPE_STDIN,
CHUNKTYPE_STDIN_EOF,
)
try:
conn.transport.sendall(header_buf.raw)
conn.transport.sendall(bbuf)
except socket.error as e:
# The server may send termination signal and close the socket immediately; attempt to write
# to such a socket (i.e. heartbeats) results in an error (SIGPIPE)
# Nailgun protocol is not duplex so the server does not wait on client to acknowledge
# We catch an exception and ignore it if termination has happened shortly afterwards
if not is_required and conn.wait_termination(
SEND_THREAD_WAIT_TERMINATION_SEC
):
return
raise
with conn.send_condition:
if conn.shutdown_event.is_set():
return
if not conn.send_queue.empty():
continue
conn.send_condition.wait()
if conn.shutdown_event.is_set():
return
except Exception as e:
# save exception to rethrow on main thread
with conn.error_lock:
conn.error = e
conn.error_traceback = sys.exc_info()[2]
conn.shutdown_event.set()
def stdin_thread_main(conn):
"""
Stdin thread reading worker function
If stdin is available, read it to internal buffer and send to server
"""
try:
eof = False
while True:
# wait for signal to read new line from stdin or shutdown
# we do not start reading from stdin before server actually requests that
with conn.stdin_condition:
if conn.shutdown_event.is_set():
return
conn.stdin_condition.wait()
if conn.shutdown_event.is_set():
return
if not conn.stdin or eof:
conn._send_chunk(buf, CHUNKTYPE_STDIN_EOF)
continue
buf = conn.stdin.readline()
if buf == "":
eof = True
conn._send_chunk(buf, CHUNKTYPE_STDIN_EOF)
continue
conn._send_chunk(buf, CHUNKTYPE_STDIN)
except Exception as e:
# save exception to rethrow on main thread
with conn.error_lock:
conn.error = e
conn.error_traceback = sys.exc_info()[2]
conn.shutdown_event.set()
def heartbeat_thread_main(conn):
"""
Heartbeat thread worker function
Periodically sends heartbeats to server as long as command is running
"""
try:
while True:
with conn.heartbeat_condition:
if conn.shutdown_event.is_set():
return
conn.heartbeat_condition.wait(conn.heartbeat_interval_sec)
if conn.shutdown_event.is_set():
return
conn._send_heartbeat()
except Exception as e:
# save exception to rethrow on main thread
with conn.error_lock:
conn.error = e
conn.error_traceback = sys.exc_info()[2]
conn.shutdown_event.set()
def make_nailgun_transport(nailgun_server, nailgun_port=None, cwd=None):
"""
Creates and returns a socket connection to the nailgun server.
"""
transport = None
if nailgun_server.startswith("local:"):
if platform.system() == "Windows":
pipe_addr = nailgun_server[6:]
transport = WindowsNamedPipeTransport(pipe_addr)
else:
try:
s = socket.socket(socket.AF_UNIX)
except socket.error as msg:
re_raise(
NailgunException(
"Could not create local socket connection to server: {0}".format(
msg
),
NailgunException.SOCKET_FAILED,
)
)
socket_addr = nailgun_server[6:]
prev_cwd = os.getcwd()
try:
if cwd is not None:
os.chdir(cwd)
s.connect(socket_addr)
transport = UnixTransport(s)
except socket.error as msg:
re_raise(
NailgunException(
"Could not connect to local server at {0}: {1}".format(
socket_addr, msg
),
NailgunException.CONNECT_FAILED,
)
)
finally:
if cwd is not None:
os.chdir(prev_cwd)
else:
socket_addr = nailgun_server
socket_family = socket.AF_UNSPEC
for (af, socktype, proto, _, sa) in socket.getaddrinfo(
nailgun_server, nailgun_port, socket.AF_UNSPEC, socket.SOCK_STREAM
):
try:
s = socket.socket(af, socktype, proto)
except socket.error as msg:
s = None
continue
try:
s.connect(sa)
transport = UnixTransport(s)
except socket.error as msg:
s.close()
s = None
continue
break
if transport is None:
raise NailgunException(
"Could not connect to server {0}:{1}".format(nailgun_server, nailgun_port),
NailgunException.CONNECT_FAILED,
)
return transport
if is_py2:
exec(
'''
def re_raise(ex, ex_trace = None):
"""
Throw ex and preserve stack trace of original exception if we run on Python 2
"""
if ex_trace is None:
ex_trace = sys.exc_info()[2]
raise ex, None, ex_trace
'''
)
else:
def re_raise(ex, ex_trace=None):
"""
Throw ex and preserve stack trace of original exception if we run on Python 2
"""
raise ex
def main():
"""
Main entry point to the nailgun client.
"""
default_nailgun_server = os.environ.get("NAILGUN_SERVER", "127.0.0.1")
default_nailgun_port = int(os.environ.get("NAILGUN_PORT", NAILGUN_PORT_DEFAULT))
parser = optparse.OptionParser(usage="%prog [options] cmd arg1 arg2 ...")
parser.add_option("--nailgun-server", default=default_nailgun_server)
parser.add_option("--nailgun-port", type="int", default=default_nailgun_port)
parser.add_option("--nailgun-filearg")
parser.add_option("--nailgun-showversion", action="store_true")
parser.add_option("--nailgun-help", action="help")
(options, args) = parser.parse_args()
if options.nailgun_showversion:
print("NailGun client version " + NAILGUN_VERSION)
if len(args):
cmd = args.pop(0)
else:
cmd = os.path.basename(sys.argv[0])
# Pass any remaining command line arguments to the server.
cmd_args = args
try:
with NailgunConnection(
options.nailgun_server, server_port=options.nailgun_port
) as c:
exit_code = c.send_command(cmd, cmd_args, options.nailgun_filearg)
sys.exit(exit_code)
except NailgunException as e:
sys.stderr.write(str(e))
sys.exit(e.code)
except KeyboardInterrupt as e:
pass
if __name__ == "__main__":
main()