From 5530c8c21319d3736a67ac915aa5e630bf5a63ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gon=C3=A7alo=20Val=C3=A9rio?= Date: Wed, 26 Jan 2022 23:45:08 +0000 Subject: [PATCH] Add system notifications with the currently available information --- README.rst | 31 ++++++++++++++++++- clipboard_watcher/app.py | 48 ++++++++++++++++++++++++------ clipboard_watcher/models.py | 2 +- clipboard_watcher/notifications.py | 16 ++++++++++ clipboard_watcher/xoperations.py | 47 ++++++++++++++++++----------- poetry.lock | 13 +++++++- pyproject.toml | 1 + 7 files changed, 129 insertions(+), 29 deletions(-) create mode 100644 clipboard_watcher/notifications.py diff --git a/README.rst b/README.rst index 80499c7..90973f3 100644 --- a/README.rst +++ b/README.rst @@ -1,4 +1,33 @@ clipboard-watcher ================= -Keep an eye on the apps that are using your clipboard \ No newline at end of file +This repository contains the code of an experiment, in order to understand +how hard would it be (if possible) to have an application that can monitor +the access of other apps to the clipboard on Linux machines. + +At the moment it only supports desktop environments that use X. + +To learn more please read the +`original blog post `_. + +Installation +------------ + +To build and run this demo app, you will need to have the following libraries +on your system: + +* ``libdbus-1-dev`` +* ```libglib2.0-dev``` + +You will also need to have `Poetry `_ in order to be +able to execute the following commands: + +.. code-block:: + + $ poetry install + $ poetry run watcher + +Contributions +------------- + +All contributions and improvements are welcome. \ No newline at end of file diff --git a/clipboard_watcher/app.py b/clipboard_watcher/app.py index b18c8d5..934793c 100644 --- a/clipboard_watcher/app.py +++ b/clipboard_watcher/app.py @@ -1,14 +1,17 @@ import logging import sys from argparse import ArgumentParser +from threading import Thread +from queue import Queue from Xlib import X, display from .models import ClipboardData from .xoperations import process_event_loop +from .notifications import display_desktop_notification -logger = logging.getLogger() +logger = logging.getLogger("ClipboardWatcher") def set_logger_settings(level_name: str) -> None: @@ -16,6 +19,15 @@ def set_logger_settings(level_name: str) -> None: logging.basicConfig(stream=sys.stdout, level=level) +def process_notifications(q: Queue): + while True: + req = q.get(block=True) + display_desktop_notification( + f"New access to {req['selection']}({req['target']}) detected.", + f"Window: {req['window_name']} (id: {req['id']})\nPossible PID: {req['pid']} | Extra Info: {req['extra']}", + ) + + def main() -> None: parser = ArgumentParser( "Monitors the access of other processes to the clipboard contents." @@ -25,21 +37,39 @@ def main() -> None: if args.loglevel and args.loglevel in ["DEBUG", "INFO", "WARNING", "ERROR"]: set_logger_settings(args.loglevel) else: - set_logger_settings("WARNING") + set_logger_settings("INFO") - logger.info("Initializing Xclient") - d = display.Display() + logger.info("Initializing X client") + disp = display.Display() # Create ourselves a window and a property for the returned data - w = d.screen().root.create_window(0, 0, 10, 10, 0, X.CopyFromParent) - w.set_wm_name("clipboard_watcher") + window = disp.screen().root.create_window(0, 0, 10, 10, 0, X.CopyFromParent) + window.set_wm_name("clipboard_watcher") logger.debug("Getting selection data") - cb_data = ClipboardData(d, w, {}, {}) + cb_data = ClipboardData(disp, window, {}, {}) cb_data.refresh_all() logger.debug("Taken ownership of all selections") - logger.info("Will start processing requests") - process_event_loop(d, w, cb_data) + job_queue = Queue() + # Thread 1 + event_worker = Thread( + target=process_event_loop, args=(disp, window, job_queue, cb_data), daemon=True + ) + # Thread 2 + notif_worker = Thread( + target=process_notifications, + args=(job_queue,), + daemon=True, + ) + + event_worker.start() + notif_worker.start() + logger.info("Setup done. Keeping an eye on the clipboard") + try: + event_worker.join() + notif_worker.join() + except KeyboardInterrupt: + logger.info("Shutting down") if __name__ == "__main__": diff --git a/clipboard_watcher/models.py b/clipboard_watcher/models.py index 4b6f5d6..a88dcf9 100644 --- a/clipboard_watcher/models.py +++ b/clipboard_watcher/models.py @@ -6,7 +6,7 @@ from Xlib import X from .xoperations import get_selection_data, get_selection_targets -logger = logging.getLogger(__name__) +logger = logging.getLogger("ClipboardWatcher") @dataclass diff --git a/clipboard_watcher/notifications.py b/clipboard_watcher/notifications.py new file mode 100644 index 0000000..8024ab6 --- /dev/null +++ b/clipboard_watcher/notifications.py @@ -0,0 +1,16 @@ +import dbus +import logging + +logger = logging.getLogger("ClipboardWatcher") + + +def display_desktop_notification(title: str, details: str = "", icon: str = "") -> None: + interface = "org.freedesktop.Notifications" + path = "/org/freedesktop/Notifications" + notification = dbus.Interface( + dbus.SessionBus().get_object(interface, path), interface + ) + try: + notification.Notify("Clipboard-Watcher", 0, icon, title, details, [], [], 7000) + except Exception: + logger.error("Unable to publish notification due to dbus error") diff --git a/clipboard_watcher/xoperations.py b/clipboard_watcher/xoperations.py index 252cf6a..1090de7 100644 --- a/clipboard_watcher/xoperations.py +++ b/clipboard_watcher/xoperations.py @@ -1,14 +1,15 @@ import logging from typing import List, Optional, Tuple +from queue import Queue from Xlib import X, Xatom from Xlib.protocol import event - -logger = logging.getLogger(__name__) +logger = logging.getLogger("ClipboardWatcher") def get_selection_targets(disp, win, selection: str) -> List[str]: + """Query a selection owner for a list of all available targets.""" data_info = get_selection_data(disp, win, selection, "TARGETS") if not data_info or data_info[1] != 32 and data_info[2] != Xatom.ATOM: return [] @@ -18,7 +19,7 @@ def get_selection_targets(disp, win, selection: str) -> List[str]: def get_selection_data(disp, win, selection: str, target: str) -> Optional[Tuple]: - """Get information from a particular selection.""" + """Retrieve the data for a given target from the selection owner.""" sel_atom = disp.get_atom(selection) target_atom = disp.get_atom(target) data_atom = disp.get_atom("SEL_DATA") @@ -29,8 +30,6 @@ def get_selection_data(disp, win, selection: str, target: str) -> Optional[Tuple logger.info("No owner for selection %s", selection) return - # Ask for the selection. We shouldn't use X.CurrentTime, but - # since we don't have an event here we have to. win.convert_selection(sel_atom, target_atom, data_atom, X.CurrentTime) # Wait for the notification that we got the selection @@ -44,7 +43,7 @@ def get_selection_data(disp, win, selection: str, target: str) -> Optional[Tuple logger.info("SelectionNotify event does not match our request: %s", e) if e.property == X.NONE: - logger.info("selection lost or conversion to TEXT failed") + logger.info("Selection lost or conversion to TEXT failed") return if e.property != data_atom: @@ -57,7 +56,7 @@ def get_selection_data(disp, win, selection: str, target: str) -> Optional[Tuple # Can the data be used directly or read incrementally if r.property_type == disp.get_atom("INCR"): - logger.info("reading data incrementally: at least %d bytes", r.value[0]) + logger.info("Reading data incrementally: at least %d bytes", r.value[0]) data = _handle_incr(disp, win, data_atom) else: data = r.value @@ -68,17 +67,13 @@ def get_selection_data(disp, win, selection: str, target: str) -> Optional[Tuple def _handle_incr(d, w, data_atom): - # This works by us removing the data property, the selection owner - # getting a notification of that, and then setting the property - # again with more data. To notice that, we must listen for - # PropertyNotify events. + """Handle the selection's data, when it is provided in chunks""" w.change_attributes(event_mask=X.PropertyChangeMask) data = None while True: # Delete data property to tell owner to give us more data w.delete_property(data_atom) - # Wait for notification that we got data while True: e = d.next_event() @@ -110,7 +105,7 @@ def process_selection_request_event(d, cb_data, e): if e.property == X.NONE: logger.info("request from obsolete client!") - client_prop = e.target # per ICCCM recommendation + client_prop = e.target else: client_prop = e.property @@ -193,7 +188,7 @@ def process_selection_request_event(d, cb_data, e): def process_selection_clear_event(d, cb_data, e): - logger.warning("New content on %s, assuming ownership", e.atom) + logger.warning("New content on %s, assuming ownership", d.get_atom_name(e.atom)) if e.atom == d.get_atom("PRIMARY"): cb_data.refresh_primary() elif e.atom == d.get_atom("CLIPBOARD"): @@ -204,15 +199,33 @@ def process_selection_clear_event(d, cb_data, e): logger.warning("Owner again") -def process_event_loop(d, w, cb_data): - running = True - while running: +def process_event_loop(d, w, q: Queue, cb_data): + while True: e = d.next_event() if ( e.type == X.SelectionRequest and e.owner == w and e.selection in cb_data.name_atoms(d) ): + req_id = e.requestor.id + req_name = e.requestor.get_wm_name() + req_pid = e.requestor.get_property( + d.get_atom("_NET_WM_PID"), d.get_atom("CARDINAL"), 0, 1024 + ) + extra = d.get_atom_name(e.property) process_selection_request_event(d, cb_data, e) + if d.get_atom_name(e.target) != "TARGETS": + q.put( + { + "id": req_id, + "window_name": req_name, + "pid": req_pid, + "target": d.get_atom_name(e.target), + "selection": d.get_atom_name(e.selection), + "extra": extra, + }, + block=False, + ) + elif e.type == X.SelectionClear and e.window == w: process_selection_clear_event(d, cb_data, e) diff --git a/poetry.lock b/poetry.lock index 6e6d077..427a1da 100644 --- a/poetry.lock +++ b/poetry.lock @@ -65,6 +65,14 @@ category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +[[package]] +name = "dbus-python" +version = "1.2.18" +description = "Python bindings for libdbus" +category = "main" +optional = false +python-versions = "*" + [[package]] name = "more-itertools" version = "8.12.0" @@ -210,7 +218,7 @@ python-versions = "*" [metadata] lock-version = "1.1" python-versions = ">=3.9,<3.11" -content-hash = "cd2992975cbf0dc139f3a59fb907529adc5c15de7cb62f05a3fa62145b0bd8d3" +content-hash = "6231f2ce7abc15e8e1fb0d98ea4705c356e6322e24ab4f9cfae2a223813c7bbb" [metadata.files] atomicwrites = [ @@ -233,6 +241,9 @@ colorama = [ {file = "colorama-0.4.4-py2.py3-none-any.whl", hash = "sha256:9f47eda37229f68eee03b24b9748937c7dc3868f906e8ba69fbcbdd3bc5dc3e2"}, {file = "colorama-0.4.4.tar.gz", hash = "sha256:5941b2b48a20143d2267e95b1c2a7603ce057ee39fd88e7329b0c292aa16869b"}, ] +dbus-python = [ + {file = "dbus-python-1.2.18.tar.gz", hash = "sha256:92bdd1e68b45596c833307a5ff4b217ee6929a1502f5341bae28fd120acf7260"}, +] more-itertools = [ {file = "more-itertools-8.12.0.tar.gz", hash = "sha256:7dc6ad46f05f545f900dd59e8dfb4e84a4827b97b3cfecb175ea0c7d247f6064"}, {file = "more_itertools-8.12.0-py3-none-any.whl", hash = "sha256:43e6dd9942dffd72661a2c4ef383ad7da1e6a3e968a927ad7a6083ab410a688b"}, diff --git a/pyproject.toml b/pyproject.toml index 663398e..7797855 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,6 +7,7 @@ authors = ["Gonçalo Valério "] [tool.poetry.dependencies] python = ">=3.9,<3.11" python-xlib = "^0.31" +dbus-python = "^1.2.18" [tool.poetry.dev-dependencies] pytest = "^5.2"