Connecting to DBus and sending messages¶
Jeepney can be used with several different frameworks:
For each of these, there is a module in jeepney.io
providing the
integration layer.
Here’s an example of sending a desktop notification, using blocking I/O:
from jeepney import DBusAddress, new_method_call
from jeepney.io.blocking import open_dbus_connection
notifications = DBusAddress('/org/freedesktop/Notifications',
bus_name='org.freedesktop.Notifications',
interface='org.freedesktop.Notifications')
connection = open_dbus_connection(bus='SESSION')
# Construct a new D-Bus message. new_method_call takes the address, the
# method name, the signature string, and a tuple of arguments.
msg = new_method_call(notifications, 'Notify', 'susssasa{sv}i',
('jeepney_test', # App name
0, # Not replacing any previous notification
'', # Icon
'Hello, world!', # Summary
'This is an example notification from Jeepney',
[], {}, # Actions, hints
-1, # expire_timeout (-1 = default)
))
# Send the message and wait for the reply
reply = connection.send_and_get_reply(msg)
print('Notification ID:', reply.body[0])
connection.close()
And here is the same thing using asyncio:
"""Send a desktop notification
See also aio_notify.py, which does the same with the higher-level Proxy API.
"""
import asyncio
from jeepney import DBusAddress, new_method_call
from jeepney.io.asyncio import open_dbus_router
notifications = DBusAddress('/org/freedesktop/Notifications',
bus_name='org.freedesktop.Notifications',
interface='org.freedesktop.Notifications')
async def send_notification():
msg = new_method_call(notifications, 'Notify', 'susssasa{sv}i',
('jeepney_test', # App name
0, # Not replacing any previous notification
'', # Icon
'Hello, world!', # Summary
'This is an example notification from Jeepney',
[], {}, # Actions, hints
-1, # expire_timeout (-1 = default)
))
# Send the message and await the reply
async with open_dbus_router() as router:
reply = await router.send_and_get_reply(msg)
print('Notification ID:', reply.body[0])
loop = asyncio.get_event_loop()
loop.run_until_complete(send_notification())
See the examples folder in Jeepney’s source repository for more examples.
Connections and Routers¶
Each integration (except blocking I/O) can create connections and routers.
Routers are useful for calling methods in other processes.
Routers let you send a request and wait for a reply, using a
proxy or with router.send_and_get_reply()
.
You can also filter incoming messages into queues, e.g. to wait for a specific
signal. But if messages arrive faster than they are processed, these queues fill
up, and messages may be dropped.
Connections are simpler: they let you send and receive messages, but
conn.receive()
will give you the next message read, whatever that is.
You’d use this to write a server which responds to incoming messages.
A connection will never discard an incoming message.
Note
For blocking, single-threaded I/O, the connection doubles as a router.
Incoming messages while you’re waiting for a reply will be filtered,
and you can also filter the next message by calling conn.recv_messages()
.
Routers for the other integrations receive messages in a background task.
Message generators and proxies¶
If you’re calling a number of different methods, you can make a message generator class containing their definitions. Jeepney includes a tool to generate these classes automatically—see Generating D-Bus wrappers.
Message generators define how to construct messages. Proxies are wrappers around message generators which send a message and get the reply back.
Let’s rewrite the example above to use a message generator and a proxy:
"""Send a desktop notification
See also aio_notify_noproxy.py, which does the same with lower-level APIs
"""
import asyncio
from jeepney import MessageGenerator, new_method_call
from jeepney.io.asyncio import open_dbus_router, Proxy
# ---- Message generator, created by jeepney.bindgen ----
class Notifications(MessageGenerator):
interface = 'org.freedesktop.Notifications'
def __init__(self, object_path='/org/freedesktop/Notifications',
bus_name='org.freedesktop.Notifications'):
super().__init__(object_path=object_path, bus_name=bus_name)
def Notify(self, arg_0, arg_1, arg_2, arg_3, arg_4, arg_5, arg_6, arg_7):
return new_method_call(self, 'Notify', 'susssasa{sv}i',
(arg_0, arg_1, arg_2, arg_3, arg_4, arg_5, arg_6, arg_7))
def CloseNotification(self, arg_0):
return new_method_call(self, 'CloseNotification', 'u',
(arg_0,))
def GetCapabilities(self):
return new_method_call(self, 'GetCapabilities')
def GetServerInformation(self):
return new_method_call(self, 'GetServerInformation')
# ---- End auto generated code ----
async def send_notification():
async with open_dbus_router() as router:
proxy = Proxy(Notifications(), router)
resp = await proxy.Notify('jeepney_test', # App name
0, # Not replacing any previous notification
'', # Icon
'Hello, world!', # Summary
'This is an example notification from Jeepney',
[], {}, # Actions, hints
-1, # expire_timeout (-1 = default)
)
print('Notification ID:', resp[0])
if __name__ == '__main__':
asyncio.run(send_notification())
This is more code for the simple use case here, but in a larger application collecting the message definitions together like this could make it clearer.
Sending & receiving file descriptors¶
New in version 0.7.
D-Bus allows sending file descriptors - references to open files, sockets, etc.
To use this, use the blocking, multi-threading or Trio integration and enable it
(enable_fds=True
) when connecting to D-Bus. If you enable FD support but the
message bus can’t or won’t support it, FDNegotiationError
will be raised.
To send a file descriptor, pass any object with a .fileno()
method, such as
an open file or socket, or a suitable integer. The file descriptor must not be
closed before the message is sent.
A received file descriptor will be returned as a FileDescriptor
object
to help avoid leaking FDs. This can easily be converted to
a file object (to_file()
),
a socket (to_socket()
)
or a plain integer (to_raw_fd()
).
# Send a file descriptor for a temp file (normally not visible in any folder)
with TemporaryFile() as tf:
msg = new_method_call(server, 'write_data', 'h', (tf,))
await router.send_and_get_reply(msg)
# Receive a file descriptor, use it as a writable file
msg = await conn.receive()
fd, = msg.body
with fd.to_file('w') as f:
f.write(f'Timestamp: {datetime.now()}')
The snippets above are based on the Trio integration. See the examples directory in the Jeepney repository for complete, working examples.