def send_thread_main()

in nailgun-client/py/ng.py [0:0]


def send_thread_main(conn):
    """
    Sending thread worker function
    Waits for data and transmits it to server
    """
    try:
        header_buf = ctypes.create_string_buffer(CHUNK_HEADER_LEN)
        while True:
            connection_error = None
            while not conn.send_queue.empty():
                # only this thread can deplete the queue, so it is safe to use blocking get()
                (chunk_type, buf) = conn.send_queue.get()
                bbuf = to_bytes(buf)
                struct.pack_into(">ic", header_buf, 0, len(bbuf), chunk_type)

                # these chunk types are not required for server to accept and process and server may terminate
                # any time without waiting for them
                is_required = chunk_type not in (
                    CHUNKTYPE_HEARTBEAT,
                    CHUNKTYPE_STDIN,
                    CHUNKTYPE_STDIN_EOF,
                )

                try:
                    conn.transport.sendall(header_buf.raw)
                    conn.transport.sendall(bbuf)
                except socket.error as e:
                    # The server may send termination signal and close the socket immediately; attempt to write
                    # to such a socket (i.e. heartbeats) results in an error (SIGPIPE)
                    # Nailgun protocol is not duplex so the server does not wait on client to acknowledge
                    # We catch an exception and ignore it if termination has happened shortly afterwards
                    if not is_required and conn.wait_termination(
                        SEND_THREAD_WAIT_TERMINATION_SEC
                    ):
                        return
                    raise

            with conn.send_condition:
                if conn.shutdown_event.is_set():
                    return
                if not conn.send_queue.empty():
                    continue
                conn.send_condition.wait()
                if conn.shutdown_event.is_set():
                    return
    except Exception as e:
        # save exception to rethrow on main thread
        with conn.error_lock:
            conn.error = e
            conn.error_traceback = sys.exc_info()[2]
        conn.shutdown_event.set()