Add system notifications with the currently available information

This commit is contained in:
Gonçalo Valério 2022-01-26 23:45:08 +00:00
parent 2d493b8d6c
commit 5530c8c213
Signed by: dethos
GPG Key ID: DF557F2BDCC2445E
7 changed files with 129 additions and 29 deletions

View File

@ -1,4 +1,33 @@
clipboard-watcher
=================
Keep an eye on the apps that are using your clipboard
This repository contains the code of an experiment, in order to understand
how hard would it be (if possible) to have an application that can monitor
the access of other apps to the clipboard on Linux machines.
At the moment it only supports desktop environments that use X.
To learn more please read the
`original blog post <https://blog.ovalerio.net/archives/2346>`_.
Installation
------------
To build and run this demo app, you will need to have the following libraries
on your system:
* ``libdbus-1-dev``
* ```libglib2.0-dev```
You will also need to have `Poetry <http://www.python.org/>`_ in order to be
able to execute the following commands:
.. code-block::
$ poetry install
$ poetry run watcher
Contributions
-------------
All contributions and improvements are welcome.

View File

@ -1,14 +1,17 @@
import logging
import sys
from argparse import ArgumentParser
from threading import Thread
from queue import Queue
from Xlib import X, display
from .models import ClipboardData
from .xoperations import process_event_loop
from .notifications import display_desktop_notification
logger = logging.getLogger()
logger = logging.getLogger("ClipboardWatcher")
def set_logger_settings(level_name: str) -> None:
@ -16,6 +19,15 @@ def set_logger_settings(level_name: str) -> None:
logging.basicConfig(stream=sys.stdout, level=level)
def process_notifications(q: Queue):
while True:
req = q.get(block=True)
display_desktop_notification(
f"New access to {req['selection']}({req['target']}) detected.",
f"Window: {req['window_name']} (id: {req['id']})\nPossible PID: {req['pid']} | Extra Info: {req['extra']}",
)
def main() -> None:
parser = ArgumentParser(
"Monitors the access of other processes to the clipboard contents."
@ -25,21 +37,39 @@ def main() -> None:
if args.loglevel and args.loglevel in ["DEBUG", "INFO", "WARNING", "ERROR"]:
set_logger_settings(args.loglevel)
else:
set_logger_settings("WARNING")
set_logger_settings("INFO")
logger.info("Initializing Xclient")
d = display.Display()
logger.info("Initializing X client")
disp = display.Display()
# Create ourselves a window and a property for the returned data
w = d.screen().root.create_window(0, 0, 10, 10, 0, X.CopyFromParent)
w.set_wm_name("clipboard_watcher")
window = disp.screen().root.create_window(0, 0, 10, 10, 0, X.CopyFromParent)
window.set_wm_name("clipboard_watcher")
logger.debug("Getting selection data")
cb_data = ClipboardData(d, w, {}, {})
cb_data = ClipboardData(disp, window, {}, {})
cb_data.refresh_all()
logger.debug("Taken ownership of all selections")
logger.info("Will start processing requests")
process_event_loop(d, w, cb_data)
job_queue = Queue()
# Thread 1
event_worker = Thread(
target=process_event_loop, args=(disp, window, job_queue, cb_data), daemon=True
)
# Thread 2
notif_worker = Thread(
target=process_notifications,
args=(job_queue,),
daemon=True,
)
event_worker.start()
notif_worker.start()
logger.info("Setup done. Keeping an eye on the clipboard")
try:
event_worker.join()
notif_worker.join()
except KeyboardInterrupt:
logger.info("Shutting down")
if __name__ == "__main__":

View File

@ -6,7 +6,7 @@ from Xlib import X
from .xoperations import get_selection_data, get_selection_targets
logger = logging.getLogger(__name__)
logger = logging.getLogger("ClipboardWatcher")
@dataclass

View File

@ -0,0 +1,16 @@
import dbus
import logging
logger = logging.getLogger("ClipboardWatcher")
def display_desktop_notification(title: str, details: str = "", icon: str = "") -> None:
interface = "org.freedesktop.Notifications"
path = "/org/freedesktop/Notifications"
notification = dbus.Interface(
dbus.SessionBus().get_object(interface, path), interface
)
try:
notification.Notify("Clipboard-Watcher", 0, icon, title, details, [], [], 7000)
except Exception:
logger.error("Unable to publish notification due to dbus error")

View File

@ -1,14 +1,15 @@
import logging
from typing import List, Optional, Tuple
from queue import Queue
from Xlib import X, Xatom
from Xlib.protocol import event
logger = logging.getLogger(__name__)
logger = logging.getLogger("ClipboardWatcher")
def get_selection_targets(disp, win, selection: str) -> List[str]:
"""Query a selection owner for a list of all available targets."""
data_info = get_selection_data(disp, win, selection, "TARGETS")
if not data_info or data_info[1] != 32 and data_info[2] != Xatom.ATOM:
return []
@ -18,7 +19,7 @@ def get_selection_targets(disp, win, selection: str) -> List[str]:
def get_selection_data(disp, win, selection: str, target: str) -> Optional[Tuple]:
"""Get information from a particular selection."""
"""Retrieve the data for a given target from the selection owner."""
sel_atom = disp.get_atom(selection)
target_atom = disp.get_atom(target)
data_atom = disp.get_atom("SEL_DATA")
@ -29,8 +30,6 @@ def get_selection_data(disp, win, selection: str, target: str) -> Optional[Tuple
logger.info("No owner for selection %s", selection)
return
# Ask for the selection. We shouldn't use X.CurrentTime, but
# since we don't have an event here we have to.
win.convert_selection(sel_atom, target_atom, data_atom, X.CurrentTime)
# Wait for the notification that we got the selection
@ -44,7 +43,7 @@ def get_selection_data(disp, win, selection: str, target: str) -> Optional[Tuple
logger.info("SelectionNotify event does not match our request: %s", e)
if e.property == X.NONE:
logger.info("selection lost or conversion to TEXT failed")
logger.info("Selection lost or conversion to TEXT failed")
return
if e.property != data_atom:
@ -57,7 +56,7 @@ def get_selection_data(disp, win, selection: str, target: str) -> Optional[Tuple
# Can the data be used directly or read incrementally
if r.property_type == disp.get_atom("INCR"):
logger.info("reading data incrementally: at least %d bytes", r.value[0])
logger.info("Reading data incrementally: at least %d bytes", r.value[0])
data = _handle_incr(disp, win, data_atom)
else:
data = r.value
@ -68,17 +67,13 @@ def get_selection_data(disp, win, selection: str, target: str) -> Optional[Tuple
def _handle_incr(d, w, data_atom):
# This works by us removing the data property, the selection owner
# getting a notification of that, and then setting the property
# again with more data. To notice that, we must listen for
# PropertyNotify events.
"""Handle the selection's data, when it is provided in chunks"""
w.change_attributes(event_mask=X.PropertyChangeMask)
data = None
while True:
# Delete data property to tell owner to give us more data
w.delete_property(data_atom)
# Wait for notification that we got data
while True:
e = d.next_event()
@ -110,7 +105,7 @@ def process_selection_request_event(d, cb_data, e):
if e.property == X.NONE:
logger.info("request from obsolete client!")
client_prop = e.target # per ICCCM recommendation
client_prop = e.target
else:
client_prop = e.property
@ -193,7 +188,7 @@ def process_selection_request_event(d, cb_data, e):
def process_selection_clear_event(d, cb_data, e):
logger.warning("New content on %s, assuming ownership", e.atom)
logger.warning("New content on %s, assuming ownership", d.get_atom_name(e.atom))
if e.atom == d.get_atom("PRIMARY"):
cb_data.refresh_primary()
elif e.atom == d.get_atom("CLIPBOARD"):
@ -204,15 +199,33 @@ def process_selection_clear_event(d, cb_data, e):
logger.warning("Owner again")
def process_event_loop(d, w, cb_data):
running = True
while running:
def process_event_loop(d, w, q: Queue, cb_data):
while True:
e = d.next_event()
if (
e.type == X.SelectionRequest
and e.owner == w
and e.selection in cb_data.name_atoms(d)
):
req_id = e.requestor.id
req_name = e.requestor.get_wm_name()
req_pid = e.requestor.get_property(
d.get_atom("_NET_WM_PID"), d.get_atom("CARDINAL"), 0, 1024
)
extra = d.get_atom_name(e.property)
process_selection_request_event(d, cb_data, e)
if d.get_atom_name(e.target) != "TARGETS":
q.put(
{
"id": req_id,
"window_name": req_name,
"pid": req_pid,
"target": d.get_atom_name(e.target),
"selection": d.get_atom_name(e.selection),
"extra": extra,
},
block=False,
)
elif e.type == X.SelectionClear and e.window == w:
process_selection_clear_event(d, cb_data, e)

13
poetry.lock generated
View File

@ -65,6 +65,14 @@ category = "dev"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
[[package]]
name = "dbus-python"
version = "1.2.18"
description = "Python bindings for libdbus"
category = "main"
optional = false
python-versions = "*"
[[package]]
name = "more-itertools"
version = "8.12.0"
@ -210,7 +218,7 @@ python-versions = "*"
[metadata]
lock-version = "1.1"
python-versions = ">=3.9,<3.11"
content-hash = "cd2992975cbf0dc139f3a59fb907529adc5c15de7cb62f05a3fa62145b0bd8d3"
content-hash = "6231f2ce7abc15e8e1fb0d98ea4705c356e6322e24ab4f9cfae2a223813c7bbb"
[metadata.files]
atomicwrites = [
@ -233,6 +241,9 @@ colorama = [
{file = "colorama-0.4.4-py2.py3-none-any.whl", hash = "sha256:9f47eda37229f68eee03b24b9748937c7dc3868f906e8ba69fbcbdd3bc5dc3e2"},
{file = "colorama-0.4.4.tar.gz", hash = "sha256:5941b2b48a20143d2267e95b1c2a7603ce057ee39fd88e7329b0c292aa16869b"},
]
dbus-python = [
{file = "dbus-python-1.2.18.tar.gz", hash = "sha256:92bdd1e68b45596c833307a5ff4b217ee6929a1502f5341bae28fd120acf7260"},
]
more-itertools = [
{file = "more-itertools-8.12.0.tar.gz", hash = "sha256:7dc6ad46f05f545f900dd59e8dfb4e84a4827b97b3cfecb175ea0c7d247f6064"},
{file = "more_itertools-8.12.0-py3-none-any.whl", hash = "sha256:43e6dd9942dffd72661a2c4ef383ad7da1e6a3e968a927ad7a6083ab410a688b"},

View File

@ -7,6 +7,7 @@ authors = ["Gonçalo Valério <gon@ovalerio.net>"]
[tool.poetry.dependencies]
python = ">=3.9,<3.11"
python-xlib = "^0.31"
dbus-python = "^1.2.18"
[tool.poetry.dev-dependencies]
pytest = "^5.2"