diff --git a/clipboard_watcher/app.py b/clipboard_watcher/app.py index 149ea7a..b18c8d5 100644 --- a/clipboard_watcher/app.py +++ b/clipboard_watcher/app.py @@ -1,2 +1,46 @@ -def main(*args): - print("Working") +import logging +import sys +from argparse import ArgumentParser + +from Xlib import X, display + +from .models import ClipboardData +from .xoperations import process_event_loop + + +logger = logging.getLogger() + + +def set_logger_settings(level_name: str) -> None: + level = logging.getLevelName(level_name) + logging.basicConfig(stream=sys.stdout, level=level) + + +def main() -> None: + parser = ArgumentParser( + "Monitors the access of other processes to the clipboard contents." + ) + parser.add_argument("-l", "--loglevel", help="Choose the log level") + args = parser.parse_args() + if args.loglevel and args.loglevel in ["DEBUG", "INFO", "WARNING", "ERROR"]: + set_logger_settings(args.loglevel) + else: + set_logger_settings("WARNING") + + logger.info("Initializing Xclient") + d = display.Display() + # Create ourselves a window and a property for the returned data + w = d.screen().root.create_window(0, 0, 10, 10, 0, X.CopyFromParent) + w.set_wm_name("clipboard_watcher") + + logger.debug("Getting selection data") + cb_data = ClipboardData(d, w, {}, {}) + cb_data.refresh_all() + logger.debug("Taken ownership of all selections") + + logger.info("Will start processing requests") + process_event_loop(d, w, cb_data) + + +if __name__ == "__main__": + main() diff --git a/clipboard_watcher/models.py b/clipboard_watcher/models.py new file mode 100644 index 0000000..4b6f5d6 --- /dev/null +++ b/clipboard_watcher/models.py @@ -0,0 +1,64 @@ +from dataclasses import dataclass +from typing import Optional, Any, Dict, List +import logging + +from Xlib import X + +from .xoperations import get_selection_data, get_selection_targets + +logger = logging.getLogger(__name__) + + +@dataclass +class SelectionValue: + value: Optional[Any] # TODO figure out later + format: Optional[int] + type: Optional[Any] # TODO must figure later + + +@dataclass +class SelectionTarget: + target: str + data: SelectionValue + + +@dataclass +class ClipboardData: + display: Any # TODO Add types later + window: Any # TODO Add types later + primary: Dict[str, SelectionTarget] + clipboard: Dict[str, SelectionTarget] + + def _refresh_selection( + self, selection: str, content: Dict[str, SelectionTarget] + ) -> None: + targets = get_selection_targets(self.display, self.window, selection) + logger.debug("Got %s for selection %s", targets, selection) + for target in targets: + if target in ["TARGETS", "SAVE_TARGETS"]: + continue + data = get_selection_data(self.display, self.window, selection, target) + if data: + value = SelectionValue(*data) + content[target] = SelectionTarget(target, value) + + def refresh_primary(self) -> None: + selection = "PRIMARY" + sel_atom = self.display.get_atom(selection) + self.primary = {} + self._refresh_selection(selection, self.primary) + self.window.set_selection_owner(sel_atom, X.CurrentTime) + + def refresh_clipboard(self) -> None: + selection = "CLIPBOARD" + sel_atom = self.display.get_atom(selection) + self.clipboard = {} + self._refresh_selection(selection, self.clipboard) + self.window.set_selection_owner(sel_atom, X.CurrentTime) + + def refresh_all(self) -> None: + self.refresh_primary() + self.refresh_clipboard() + + def name_atoms(self, d) -> List[str]: + return [d.get_atom(sel) for sel in ["PRIMARY", "CLIPBOARD"]] diff --git a/clipboard_watcher/xoperations.py b/clipboard_watcher/xoperations.py new file mode 100644 index 0000000..252cf6a --- /dev/null +++ b/clipboard_watcher/xoperations.py @@ -0,0 +1,218 @@ +import logging +from typing import List, Optional, Tuple + +from Xlib import X, Xatom +from Xlib.protocol import event + + +logger = logging.getLogger(__name__) + + +def get_selection_targets(disp, win, selection: str) -> List[str]: + data_info = get_selection_data(disp, win, selection, "TARGETS") + if not data_info or data_info[1] != 32 and data_info[2] != Xatom.ATOM: + return [] + + data, *_ = data_info + return [disp.get_atom_name(a) for a in data] + + +def get_selection_data(disp, win, selection: str, target: str) -> Optional[Tuple]: + """Get information from a particular selection.""" + sel_atom = disp.get_atom(selection) + target_atom = disp.get_atom(target) + data_atom = disp.get_atom("SEL_DATA") + + # Ask the server who owns this selection, if any + owner = disp.get_selection_owner(sel_atom) + if owner == X.NONE: + logger.info("No owner for selection %s", selection) + return + + # Ask for the selection. We shouldn't use X.CurrentTime, but + # since we don't have an event here we have to. + win.convert_selection(sel_atom, target_atom, data_atom, X.CurrentTime) + + # Wait for the notification that we got the selection + while True: + e = disp.next_event() + if e.type == X.SelectionNotify: + break + + # Do some sanity checks + if e.requestor != win or e.selection != sel_atom or e.target != target_atom: + logger.info("SelectionNotify event does not match our request: %s", e) + + if e.property == X.NONE: + logger.info("selection lost or conversion to TEXT failed") + return + + if e.property != data_atom: + logger.info("SelectionNotify event does not match our request: %s", e) + + # Get the data + r = win.get_full_property(data_atom, X.AnyPropertyType, sizehint=10000) + if not r: + return + + # Can the data be used directly or read incrementally + if r.property_type == disp.get_atom("INCR"): + logger.info("reading data incrementally: at least %d bytes", r.value[0]) + data = _handle_incr(disp, win, data_atom) + else: + data = r.value + + # Tell selection owner that we're done + win.delete_property(data_atom) + return (data, r.format, r.property_type) + + +def _handle_incr(d, w, data_atom): + # This works by us removing the data property, the selection owner + # getting a notification of that, and then setting the property + # again with more data. To notice that, we must listen for + # PropertyNotify events. + w.change_attributes(event_mask=X.PropertyChangeMask) + data = None + + while True: + # Delete data property to tell owner to give us more data + w.delete_property(data_atom) + + # Wait for notification that we got data + while True: + e = d.next_event() + if ( + e.type == X.PropertyNotify + and e.state == X.PropertyNewValue + and e.window == w + and e.atom == data_atom + ): + break + + r = w.get_full_property(data_atom, X.AnyPropertyType, sizehint=10000) + + # End of data + if len(r.value) == 0: + return data + + if not data: + data = e.value + continue + + data += r.value + + +def process_selection_request_event(d, cb_data, e): + logger.debug("Selection %s request from %s", e.selection, e.requestor.get_wm_name()) + client = e.requestor + targets_atom = d.get_atom("TARGETS") + + if e.property == X.NONE: + logger.info("request from obsolete client!") + client_prop = e.target # per ICCCM recommendation + else: + client_prop = e.property + + target_name = d.get_atom_name(e.target) + + logger.info( + "got request for %s, dest %s on %d %s", + target_name, + d.get_atom_name(client_prop), + client.id, + client.get_wm_name(), + ) + + if e.selection == d.get_atom("PRIMARY"): + if target_name == "TARGETS": + atoms = [d.get_atom(name) for name in cb_data.primary.keys()] + prop = { + "value": [targets_atom] + atoms, + "format": 32, + "type": Xatom.ATOM, + } + elif target_name in cb_data.primary.keys(): + cb_values = cb_data.primary[target_name].data + prop = { + "value": cb_values.value, + "format": cb_values.format, + "type": cb_values.type, + } + else: + logger.warning("Invalid target") + client_prop = X.NONE + prop = None + elif e.selection == d.get_atom("CLIPBOARD"): + if target_name == "TARGETS": + atoms = [d.get_atom(name) for name in cb_data.clipboard.keys()] + prop = { + "value": [targets_atom] + atoms, + "format": 32, + "type": Xatom.ATOM, + } + elif target_name in cb_data.clipboard.keys(): + cb_values = cb_data.clipboard[target_name].data + prop = { + "value": cb_values.value, + "format": cb_values.format, + "type": cb_values.type, + } + else: + logger.info("Received selection request for invalid target") + client_prop = X.NONE + prop = None + else: + logger.info("Received event for other selection") + client_prop = X.NONE + prop = None + + if client_prop != X.NONE: + if prop is not None: + client.change_property( + client_prop, prop["type"], prop["format"], prop["value"] + ) + + # And always send a selection notification + ev = event.SelectionNotify( + time=e.time, + requestor=e.requestor, + selection=e.selection, + target=e.target, + property=client_prop, + ) + + client.send_event(ev) + logger.warning( + "Sent %s (target: %s) selection to %s (%s)", + d.get_atom_name(e.selection), + target_name, + e.requestor.get_wm_name(), + e.requestor.id, + ) + + +def process_selection_clear_event(d, cb_data, e): + logger.warning("New content on %s, assuming ownership", e.atom) + if e.atom == d.get_atom("PRIMARY"): + cb_data.refresh_primary() + elif e.atom == d.get_atom("CLIPBOARD"): + cb_data.refresh_clipboard() + else: + return + + logger.warning("Owner again") + + +def process_event_loop(d, w, cb_data): + running = True + while running: + e = d.next_event() + if ( + e.type == X.SelectionRequest + and e.owner == w + and e.selection in cb_data.name_atoms(d) + ): + process_selection_request_event(d, cb_data, e) + elif e.type == X.SelectionClear and e.window == w: + process_selection_clear_event(d, cb_data, e) diff --git a/poetry.lock b/poetry.lock index cdd5fcd..6e6d077 100644 --- a/poetry.lock +++ b/poetry.lock @@ -164,6 +164,25 @@ wcwidth = "*" checkqa-mypy = ["mypy (==v0.761)"] testing = ["argcomplete", "hypothesis (>=3.56)", "mock", "nose", "requests", "xmlschema"] +[[package]] +name = "python-xlib" +version = "0.31" +description = "Python X Library" +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +six = ">=1.10.0" + +[[package]] +name = "six" +version = "1.16.0" +description = "Python 2 and 3 compatibility utilities" +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*" + [[package]] name = "tomli" version = "1.2.3" @@ -190,8 +209,8 @@ python-versions = "*" [metadata] lock-version = "1.1" -python-versions = "^3.9" -content-hash = "964d715983d5a8a4b28ae866a9b883c3e38fb493a3b8722107d760fac875d999" +python-versions = ">=3.9,<3.11" +content-hash = "cd2992975cbf0dc139f3a59fb907529adc5c15de7cb62f05a3fa62145b0bd8d3" [metadata.files] atomicwrites = [ @@ -250,6 +269,14 @@ pytest = [ {file = "pytest-5.4.3-py3-none-any.whl", hash = "sha256:5c0db86b698e8f170ba4582a492248919255fcd4c79b1ee64ace34301fb589a1"}, {file = "pytest-5.4.3.tar.gz", hash = "sha256:7979331bfcba207414f5e1263b5a0f8f521d0f457318836a7355531ed1a4c7d8"}, ] +python-xlib = [ + {file = "python-xlib-0.31.tar.gz", hash = "sha256:74d83a081f532bc07f6d7afcd6416ec38403d68f68b9b9dc9e1f28fbf2d799e9"}, + {file = "python_xlib-0.31-py2.py3-none-any.whl", hash = "sha256:1ec6ce0de73d9e6592ead666779a5732b384e5b8fb1f1886bd0a81cafa477759"}, +] +six = [ + {file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"}, + {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"}, +] tomli = [ {file = "tomli-1.2.3-py3-none-any.whl", hash = "sha256:e3069e4be3ead9668e21cb9b074cd948f7b3113fd9c8bba083f48247aab8b11c"}, {file = "tomli-1.2.3.tar.gz", hash = "sha256:05b6166bff487dc068d322585c7ea4ef78deed501cc124060e0f238e89a9231f"}, diff --git a/pyproject.toml b/pyproject.toml index 29ad756..663398e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,11 +1,12 @@ [tool.poetry] name = "clipboard-watcher" -version = "0.1.0" -description = "" +version = "0.0.1" +description = "Monitors the access of other processes to the clipboard contents." authors = ["Gonçalo Valério "] [tool.poetry.dependencies] -python = "^3.9" +python = ">=3.9,<3.11" +python-xlib = "^0.31" [tool.poetry.dev-dependencies] pytest = "^5.2"