Source code for jeepney.io.blocking

"""Synchronous IO wrappers around jeepney
"""
import array
from collections import deque
from errno import ECONNRESET
import functools
from itertools import count
import os
from selectors import DefaultSelector, EVENT_READ
import socket
import time
from typing import Optional
from warnings import warn

from jeepney import Parser, Message, MessageType, HeaderFields
from jeepney.auth import Authenticator, BEGIN
from jeepney.bus import get_bus
from jeepney.fds import FileDescriptor, fds_buf_size
from jeepney.wrappers import ProxyBase, unwrap_msg
from jeepney.routing import Router
from jeepney.bus_messages import message_bus
from .common import MessageFilters, FilterHandle, check_replyable

__all__ = [
    'open_dbus_connection',
    'DBusConnection',
    'Proxy',
]


class _Future:
    def __init__(self):
        self._result = None

    def done(self):
        return bool(self._result)

    def set_exception(self, exception):
        self._result = (False, exception)

    def set_result(self, result): 
        self._result = (True, result)

    def result(self):
        success, value = self._result
        if success:
            return value
        raise value


def timeout_to_deadline(timeout):
    if timeout is not None:
        return time.monotonic() + timeout
    return None

def deadline_to_timeout(deadline):
    if deadline is not None:
        return max(deadline - time.monotonic(), 0.)
    return None


class DBusConnectionBase:
    """Connection machinery shared by this module and threading"""
    def __init__(self, sock: socket.socket, enable_fds=False):
        self.sock = sock
        self.enable_fds = enable_fds
        self.parser = Parser()
        self.outgoing_serial = count(start=1)
        self.selector = DefaultSelector()
        self.select_key = self.selector.register(sock, EVENT_READ)
        self.unique_name = None

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()
        return False

    def _serialise(self, message: Message, serial) -> (bytes, Optional[array.array]):
        if serial is None:
            serial = next(self.outgoing_serial)
        fds = array.array('i') if self.enable_fds else None
        data = message.serialise(serial=serial, fds=fds)
        return data, fds

    def _send_with_fds(self, data, fds):
        bytes_sent = self.sock.sendmsg(
            [data], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]
        )
        # If sendmsg succeeds, I think ancillary data has been sent atomically?
        # So now we just need to send any leftover normal data.
        if bytes_sent < len(data):
            self.sock.sendall(data[bytes_sent:])

    def _receive(self, deadline):
        while True:
            msg = self.parser.get_next_message()
            if msg is not None:
                return msg

            b, fds = self._read_some_data(timeout=deadline_to_timeout(deadline))
            self.parser.add_data(b, fds=fds)

    def _read_some_data(self, timeout=None):
        for key, ev in self.selector.select(timeout):
            if key == self.select_key:
                if self.enable_fds:
                    return self._read_with_fds()
                else:
                    return unwrap_read(self.sock.recv(4096)), []

        raise TimeoutError

    def _read_with_fds(self):
        nbytes = self.parser.bytes_desired()
        data, ancdata, flags, _ = self.sock.recvmsg(nbytes, fds_buf_size())
        if flags & getattr(socket, 'MSG_CTRUNC', 0):
            self.close()
            raise RuntimeError("Unable to receive all file descriptors")
        return unwrap_read(data), FileDescriptor.from_ancdata(ancdata)

    def close(self):
        """Close the connection"""
        self.selector.close()
        self.sock.close()


[docs]class DBusConnection(DBusConnectionBase): def __init__(self, sock: socket.socket, enable_fds=False): super().__init__(sock, enable_fds) # Message routing machinery self._router = Router(_Future) # Old interface, for backwards compat self._filters = MessageFilters() # Say Hello, get our unique name self.bus_proxy = Proxy(message_bus, self) hello_reply = self.bus_proxy.Hello() self.unique_name = hello_reply[0] @property def router(self): warn("conn.router is deprecated, see the docs for APIs to use instead.", stacklevel=2) return self._router
[docs] def send(self, message: Message, serial=None): """Serialise and send a :class:`~.Message` object""" data, fds = self._serialise(message, serial) if fds: self._send_with_fds(data, fds) else: self.sock.sendall(data)
send_message = send # Backwards compatibility
[docs] def receive(self, *, timeout=None) -> Message: """Return the next available message from the connection If the data is ready, this will return immediately, even if timeout<=0. Otherwise, it will wait for up to timeout seconds, or indefinitely if timeout is None. If no message comes in time, it raises TimeoutError. """ return self._receive(timeout_to_deadline(timeout))
[docs] def recv_messages(self, *, timeout=None): """Receive one message and apply filters See :meth:`filter`. Returns nothing. """ msg = self.receive(timeout=timeout) self._router.incoming(msg) for filter in self._filters.matches(msg): filter.queue.append(msg)
[docs] def send_and_get_reply(self, message, *, timeout=None, unwrap=None): """Send a message, wait for the reply and return it Filters are applied to other messages received before the reply - see :meth:`add_filter`. """ check_replyable(message) deadline = timeout_to_deadline(timeout) if unwrap is None: unwrap = False else: warn("Passing unwrap= to .send_and_get_reply() is deprecated and " "will break in a future version of Jeepney.", stacklevel=2) serial = next(self.outgoing_serial) self.send_message(message, serial=serial) while True: msg_in = self.receive(timeout=deadline_to_timeout(deadline)) reply_to = msg_in.header.fields.get(HeaderFields.reply_serial, -1) if reply_to == serial: if unwrap: return unwrap_msg(msg_in) return msg_in # Not the reply self._router.incoming(msg_in) for filter in self._filters.matches(msg_in): filter.queue.append(msg_in)
[docs] def filter(self, rule, *, queue: Optional[deque] =None, bufsize=1): """Create a filter for incoming messages Usage:: with conn.filter(rule) as matches: # matches is a deque containing matched messages matching_msg = conn.recv_until_filtered(matches) :param jeepney.MatchRule rule: Catch messages matching this rule :param collections.deque queue: Matched messages will be added to this :param int bufsize: If no deque is passed in, create one with this size """ if queue is None: queue = deque(maxlen=bufsize) return FilterHandle(self._filters, rule, queue)
[docs] def recv_until_filtered(self, queue, *, timeout=None) -> Message: """Process incoming messages until one is filtered into queue Pops the message from queue and returns it, or raises TimeoutError if the optional timeout expires. Without a timeout, this is equivalent to:: while len(queue) == 0: conn.recv_messages() return queue.popleft() In the other I/O modules, there is no need for this, because messages are placed in queues by a separate task. :param collections.deque queue: A deque connected by :meth:`filter` :param float timeout: Maximum time to wait in seconds """ deadline = timeout_to_deadline(timeout) while len(queue) == 0: self.recv_messages(timeout=deadline_to_timeout(deadline)) return queue.popleft()
[docs]class Proxy(ProxyBase): """A blocking proxy for calling D-Bus methods You can call methods on the proxy object, such as ``bus_proxy.Hello()`` to make a method call over D-Bus and wait for a reply. It will either return a tuple of returned data, or raise :exc:`.DBusErrorResponse`. The methods available are defined by the message generator you wrap. You can set a time limit on a call by passing ``_timeout=`` in the method call, or set a default when creating the proxy. The ``_timeout`` argument is not passed to the message generator. All timeouts are in seconds, and :exc:`TimeoutErrror` is raised if it expires before a reply arrives. :param msggen: A message generator object :param ~blocking.DBusConnection connection: Connection to send and receive messages :param float timeout: Default seconds to wait for a reply, or None for no limit """ def __init__(self, msggen, connection, *, timeout=None): super().__init__(msggen) self._connection = connection self._timeout = timeout def __repr__(self): extra = '' if (self._timeout is None) else f', timeout={self._timeout}' return f"Proxy({self._msggen}, {self._connection}{extra})" def _method_call(self, make_msg): @functools.wraps(make_msg) def inner(*args, **kwargs): timeout = kwargs.pop('_timeout', self._timeout) msg = make_msg(*args, **kwargs) assert msg.header.message_type is MessageType.method_call return unwrap_msg(self._connection.send_and_get_reply( msg, timeout=timeout )) return inner
def unwrap_read(b): """Raise ConnectionResetError from an empty read. Sometimes the socket raises an error itself, sometimes it gives no data. I haven't worked out when it behaves each way. """ if not b: raise ConnectionResetError(ECONNRESET, os.strerror(ECONNRESET)) return b def prep_socket(addr, enable_fds=False, timeout=2.0) -> socket.socket: """Create a socket and authenticate ready to send D-Bus messages""" sock = socket.socket(family=socket.AF_UNIX) # To impose the overall auth timeout, we'll update the timeout on the socket # before each send/receive. This is ugly, but we can't use the socket for # anything else until this has succeeded, so this should be safe. deadline = timeout_to_deadline(timeout) def with_sock_deadline(meth, *args): sock.settimeout(deadline_to_timeout(deadline)) return meth(*args) try: with_sock_deadline(sock.connect, addr) authr = Authenticator(enable_fds=enable_fds) for req_data in authr: with_sock_deadline(sock.sendall, req_data) authr.feed(unwrap_read(with_sock_deadline(sock.recv, 1024))) with_sock_deadline(sock.sendall, BEGIN) except socket.timeout as e: sock.close() raise TimeoutError(f"Did not authenticate in {timeout} seconds") from e except: sock.close() raise sock.settimeout(None) # Put the socket back in blocking mode return sock
[docs]def open_dbus_connection( bus='SESSION', enable_fds=False, auth_timeout=1., ) -> DBusConnection: """Connect to a D-Bus message bus Pass ``enable_fds=True`` to allow sending & receiving file descriptors. An error will be raised if the bus does not allow this. For simplicity, it's advisable to leave this disabled unless you need it. D-Bus has an authentication step before sending or receiving messages. This takes < 1 ms in normal operation, but there is a timeout so that client code won't get stuck if the server doesn't reply. *auth_timeout* configures this timeout in seconds. """ bus_addr = get_bus(bus) sock = prep_socket(bus_addr, enable_fds, timeout=auth_timeout) conn = DBusConnection(sock, enable_fds) return conn
if __name__ == '__main__': conn = open_dbus_connection() print("Unique name:", conn.unique_name)