Source code for jeepney.bus_messages

"""Messages for talking to the DBus daemon itself

Generated by jeepney.bindgen and modified by hand.
"""
from .low_level import Message, MessageType, HeaderFields
from .wrappers import MessageGenerator, new_method_call

__all__ = [
    'DBusNameFlags',
    'DBus',
    'message_bus',
    'Monitoring',
    'Stats',
    'MatchRule',
]

class DBusNameFlags:
    allow_replacement = 1
    replace_existing = 2
    do_not_queue = 4

[docs]class DBus(MessageGenerator): """Messages to talk to the message bus """ interface = 'org.freedesktop.DBus' def __init__(self, object_path='/org/freedesktop/DBus', bus_name='org.freedesktop.DBus'): super().__init__(object_path=object_path, bus_name=bus_name)
[docs] def Hello(self): return new_method_call(self, 'Hello')
[docs] def RequestName(self, name, flags=0): return new_method_call(self, 'RequestName', 'su', (name, flags))
[docs] def ReleaseName(self, name): return new_method_call(self, 'ReleaseName', 's', (name,))
[docs] def StartServiceByName(self, name): return new_method_call(self, 'StartServiceByName', 'su', (name, 0))
[docs] def UpdateActivationEnvironment(self, env): return new_method_call(self, 'UpdateActivationEnvironment', 'a{ss}', (env,))
[docs] def NameHasOwner(self, name): return new_method_call(self, 'NameHasOwner', 's', (name,))
[docs] def ListNames(self): return new_method_call(self, 'ListNames')
[docs] def ListActivatableNames(self): return new_method_call(self, 'ListActivatableNames')
[docs] def AddMatch(self, rule): """*rule* can be a str or a :class:`MatchRule` instance""" if isinstance(rule, MatchRule): rule = rule.serialise() return new_method_call(self, 'AddMatch', 's', (rule,))
[docs] def RemoveMatch(self, rule): if isinstance(rule, MatchRule): rule = rule.serialise() return new_method_call(self, 'RemoveMatch', 's', (rule,))
[docs] def GetNameOwner(self, name): return new_method_call(self, 'GetNameOwner', 's', (name,))
[docs] def ListQueuedOwners(self, name): return new_method_call(self, 'ListQueuedOwners', 's', (name,))
[docs] def GetConnectionUnixUser(self, name): return new_method_call(self, 'GetConnectionUnixUser', 's', (name,))
[docs] def GetConnectionUnixProcessID(self, name): return new_method_call(self, 'GetConnectionUnixProcessID', 's', (name,))
[docs] def GetAdtAuditSessionData(self, name): return new_method_call(self, 'GetAdtAuditSessionData', 's', (name,))
[docs] def GetConnectionSELinuxSecurityContext(self, name): return new_method_call(self, 'GetConnectionSELinuxSecurityContext', 's', (name,))
[docs] def ReloadConfig(self): return new_method_call(self, 'ReloadConfig')
[docs] def GetId(self): return new_method_call(self, 'GetId')
[docs] def GetConnectionCredentials(self, name): return new_method_call(self, 'GetConnectionCredentials', 's', (name,))
message_bus = DBus()
[docs]class Monitoring(MessageGenerator): interface = 'org.freedesktop.DBus.Monitoring' def __init__(self, object_path='/org/freedesktop/DBus', bus_name='org.freedesktop.DBus'): super().__init__(object_path=object_path, bus_name=bus_name)
[docs] def BecomeMonitor(self, rules): """Convert this connection to a monitor connection (advanced)""" return new_method_call(self, 'BecomeMonitor', 'asu', (rules, 0))
class Stats(MessageGenerator): interface = 'org.freedesktop.DBus.Debug.Stats' def __init__(self, object_path='/org/freedesktop/DBus', bus_name='org.freedesktop.DBus'): super().__init__(object_path=object_path, bus_name=bus_name) def GetStats(self): return new_method_call(self, 'GetStats') def GetConnectionStats(self, arg0): return new_method_call(self, 'GetConnectionStats', 's', (arg0,)) def GetAllMatchRules(self): return new_method_call(self, 'GetAllMatchRules')
[docs]class MatchRule: """Construct a match rule to subscribe to DBus messages. e.g.:: mr = MatchRule( interface='org.freedesktop.DBus', member='NameOwnerChanged', type='signal' ) msg = message_bus.AddMatch(mr) # Send this message to subscribe to the signal """ def __init__(self, *, type=None, sender=None, interface=None, member=None, path=None, path_namespace=None, destination=None, eavesdrop=False): if isinstance(type, str): type = MessageType[type] self.message_type = type fields = { 'sender': sender, 'interface': interface, 'member': member, 'path': path, 'destination': destination, } self.header_fields = { k: v for (k, v) in fields.items() if (v is not None) } self.path_namespace = path_namespace self.eavesdrop = eavesdrop self.arg_conditions = {}
[docs] def add_arg_condition(self, argno: int, value: str, kind='string'): """Add a condition for a particular argument argno: int, 0-63 kind: 'string', 'path', 'namespace' """ if kind not in {'string', 'path', 'namespace'}: raise ValueError("kind={!r}".format(kind)) if kind == 'namespace' and argno != 0: raise ValueError("argno must be 0 for kind='namespace'") self.arg_conditions[argno] = (value, kind)
[docs] def serialise(self) -> str: """Convert to a string to use in an AddMatch call to the message bus""" pairs = list(self.header_fields.items()) if self.message_type: pairs.append(('type', self.message_type.name)) if self.eavesdrop: pairs.append(('eavesdrop', 'true')) for argno, (val, kind) in self.arg_conditions.items(): if kind == 'string': kind = '' pairs.append((f'arg{argno}{kind}', val)) # Quoting rules: single quotes ('') needed if the value contains a comma. # A literal ' can only be represented outside single quotes, by # backslash-escaping it. No escaping inside the quotes. # The simplest way to handle this is to use '' around every value, and # use '\'' (end quote, escaped ', restart quote) for literal ' . return ','.join( "{}='{}'".format(k, v.replace("'", r"'\''")) for (k, v) in pairs )
[docs] def matches(self, msg: Message) -> bool: """Returns True if msg matches this rule""" h = msg.header if (self.message_type is not None) and h.message_type != self.message_type: return False for field, expected in self.header_fields.items(): if h.fields.get(HeaderFields[field], None) != expected: return False if self.path_namespace is not None: path = h.fields.get(HeaderFields.path, '\0') path_ns = self.path_namespace.rstrip('/') if not ((path == path_ns) or path.startswith(path_ns + '/')): return False for argno, (expected, kind) in self.arg_conditions.items(): if argno >= len(msg.body): return False arg = msg.body[argno] if not isinstance(arg, str): return False if kind == 'string': if arg != expected: return False elif kind == 'path': if not ( (arg == expected) or (expected.endswith('/') and arg.startswith(expected)) or (arg.endswith('/') and expected.startswith(arg)) ): return False elif kind == 'namespace': if not ( (arg == expected) or arg.startswith(expected + '.') ): return False return True