First working version. Assumes ownership of clipboard selections and and logs all requests from other apps

This commit is contained in:
Gonçalo Valério 2022-01-15 18:03:18 +00:00
parent 1cdb34b8d4
commit 2d493b8d6c
Signed by: dethos
GPG Key ID: DF557F2BDCC2445E
5 changed files with 361 additions and 7 deletions

View File

@ -1,2 +1,46 @@
def main(*args):
print("Working")
import logging
import sys
from argparse import ArgumentParser
from Xlib import X, display
from .models import ClipboardData
from .xoperations import process_event_loop
logger = logging.getLogger()
def set_logger_settings(level_name: str) -> None:
level = logging.getLevelName(level_name)
logging.basicConfig(stream=sys.stdout, level=level)
def main() -> None:
parser = ArgumentParser(
"Monitors the access of other processes to the clipboard contents."
)
parser.add_argument("-l", "--loglevel", help="Choose the log level")
args = parser.parse_args()
if args.loglevel and args.loglevel in ["DEBUG", "INFO", "WARNING", "ERROR"]:
set_logger_settings(args.loglevel)
else:
set_logger_settings("WARNING")
logger.info("Initializing Xclient")
d = display.Display()
# Create ourselves a window and a property for the returned data
w = d.screen().root.create_window(0, 0, 10, 10, 0, X.CopyFromParent)
w.set_wm_name("clipboard_watcher")
logger.debug("Getting selection data")
cb_data = ClipboardData(d, w, {}, {})
cb_data.refresh_all()
logger.debug("Taken ownership of all selections")
logger.info("Will start processing requests")
process_event_loop(d, w, cb_data)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,64 @@
from dataclasses import dataclass
from typing import Optional, Any, Dict, List
import logging
from Xlib import X
from .xoperations import get_selection_data, get_selection_targets
logger = logging.getLogger(__name__)
@dataclass
class SelectionValue:
value: Optional[Any] # TODO figure out later
format: Optional[int]
type: Optional[Any] # TODO must figure later
@dataclass
class SelectionTarget:
target: str
data: SelectionValue
@dataclass
class ClipboardData:
display: Any # TODO Add types later
window: Any # TODO Add types later
primary: Dict[str, SelectionTarget]
clipboard: Dict[str, SelectionTarget]
def _refresh_selection(
self, selection: str, content: Dict[str, SelectionTarget]
) -> None:
targets = get_selection_targets(self.display, self.window, selection)
logger.debug("Got %s for selection %s", targets, selection)
for target in targets:
if target in ["TARGETS", "SAVE_TARGETS"]:
continue
data = get_selection_data(self.display, self.window, selection, target)
if data:
value = SelectionValue(*data)
content[target] = SelectionTarget(target, value)
def refresh_primary(self) -> None:
selection = "PRIMARY"
sel_atom = self.display.get_atom(selection)
self.primary = {}
self._refresh_selection(selection, self.primary)
self.window.set_selection_owner(sel_atom, X.CurrentTime)
def refresh_clipboard(self) -> None:
selection = "CLIPBOARD"
sel_atom = self.display.get_atom(selection)
self.clipboard = {}
self._refresh_selection(selection, self.clipboard)
self.window.set_selection_owner(sel_atom, X.CurrentTime)
def refresh_all(self) -> None:
self.refresh_primary()
self.refresh_clipboard()
def name_atoms(self, d) -> List[str]:
return [d.get_atom(sel) for sel in ["PRIMARY", "CLIPBOARD"]]

View File

@ -0,0 +1,218 @@
import logging
from typing import List, Optional, Tuple
from Xlib import X, Xatom
from Xlib.protocol import event
logger = logging.getLogger(__name__)
def get_selection_targets(disp, win, selection: str) -> List[str]:
data_info = get_selection_data(disp, win, selection, "TARGETS")
if not data_info or data_info[1] != 32 and data_info[2] != Xatom.ATOM:
return []
data, *_ = data_info
return [disp.get_atom_name(a) for a in data]
def get_selection_data(disp, win, selection: str, target: str) -> Optional[Tuple]:
"""Get information from a particular selection."""
sel_atom = disp.get_atom(selection)
target_atom = disp.get_atom(target)
data_atom = disp.get_atom("SEL_DATA")
# Ask the server who owns this selection, if any
owner = disp.get_selection_owner(sel_atom)
if owner == X.NONE:
logger.info("No owner for selection %s", selection)
return
# Ask for the selection. We shouldn't use X.CurrentTime, but
# since we don't have an event here we have to.
win.convert_selection(sel_atom, target_atom, data_atom, X.CurrentTime)
# Wait for the notification that we got the selection
while True:
e = disp.next_event()
if e.type == X.SelectionNotify:
break
# Do some sanity checks
if e.requestor != win or e.selection != sel_atom or e.target != target_atom:
logger.info("SelectionNotify event does not match our request: %s", e)
if e.property == X.NONE:
logger.info("selection lost or conversion to TEXT failed")
return
if e.property != data_atom:
logger.info("SelectionNotify event does not match our request: %s", e)
# Get the data
r = win.get_full_property(data_atom, X.AnyPropertyType, sizehint=10000)
if not r:
return
# Can the data be used directly or read incrementally
if r.property_type == disp.get_atom("INCR"):
logger.info("reading data incrementally: at least %d bytes", r.value[0])
data = _handle_incr(disp, win, data_atom)
else:
data = r.value
# Tell selection owner that we're done
win.delete_property(data_atom)
return (data, r.format, r.property_type)
def _handle_incr(d, w, data_atom):
# This works by us removing the data property, the selection owner
# getting a notification of that, and then setting the property
# again with more data. To notice that, we must listen for
# PropertyNotify events.
w.change_attributes(event_mask=X.PropertyChangeMask)
data = None
while True:
# Delete data property to tell owner to give us more data
w.delete_property(data_atom)
# Wait for notification that we got data
while True:
e = d.next_event()
if (
e.type == X.PropertyNotify
and e.state == X.PropertyNewValue
and e.window == w
and e.atom == data_atom
):
break
r = w.get_full_property(data_atom, X.AnyPropertyType, sizehint=10000)
# End of data
if len(r.value) == 0:
return data
if not data:
data = e.value
continue
data += r.value
def process_selection_request_event(d, cb_data, e):
logger.debug("Selection %s request from %s", e.selection, e.requestor.get_wm_name())
client = e.requestor
targets_atom = d.get_atom("TARGETS")
if e.property == X.NONE:
logger.info("request from obsolete client!")
client_prop = e.target # per ICCCM recommendation
else:
client_prop = e.property
target_name = d.get_atom_name(e.target)
logger.info(
"got request for %s, dest %s on %d %s",
target_name,
d.get_atom_name(client_prop),
client.id,
client.get_wm_name(),
)
if e.selection == d.get_atom("PRIMARY"):
if target_name == "TARGETS":
atoms = [d.get_atom(name) for name in cb_data.primary.keys()]
prop = {
"value": [targets_atom] + atoms,
"format": 32,
"type": Xatom.ATOM,
}
elif target_name in cb_data.primary.keys():
cb_values = cb_data.primary[target_name].data
prop = {
"value": cb_values.value,
"format": cb_values.format,
"type": cb_values.type,
}
else:
logger.warning("Invalid target")
client_prop = X.NONE
prop = None
elif e.selection == d.get_atom("CLIPBOARD"):
if target_name == "TARGETS":
atoms = [d.get_atom(name) for name in cb_data.clipboard.keys()]
prop = {
"value": [targets_atom] + atoms,
"format": 32,
"type": Xatom.ATOM,
}
elif target_name in cb_data.clipboard.keys():
cb_values = cb_data.clipboard[target_name].data
prop = {
"value": cb_values.value,
"format": cb_values.format,
"type": cb_values.type,
}
else:
logger.info("Received selection request for invalid target")
client_prop = X.NONE
prop = None
else:
logger.info("Received event for other selection")
client_prop = X.NONE
prop = None
if client_prop != X.NONE:
if prop is not None:
client.change_property(
client_prop, prop["type"], prop["format"], prop["value"]
)
# And always send a selection notification
ev = event.SelectionNotify(
time=e.time,
requestor=e.requestor,
selection=e.selection,
target=e.target,
property=client_prop,
)
client.send_event(ev)
logger.warning(
"Sent %s (target: %s) selection to %s (%s)",
d.get_atom_name(e.selection),
target_name,
e.requestor.get_wm_name(),
e.requestor.id,
)
def process_selection_clear_event(d, cb_data, e):
logger.warning("New content on %s, assuming ownership", e.atom)
if e.atom == d.get_atom("PRIMARY"):
cb_data.refresh_primary()
elif e.atom == d.get_atom("CLIPBOARD"):
cb_data.refresh_clipboard()
else:
return
logger.warning("Owner again")
def process_event_loop(d, w, cb_data):
running = True
while running:
e = d.next_event()
if (
e.type == X.SelectionRequest
and e.owner == w
and e.selection in cb_data.name_atoms(d)
):
process_selection_request_event(d, cb_data, e)
elif e.type == X.SelectionClear and e.window == w:
process_selection_clear_event(d, cb_data, e)

31
poetry.lock generated
View File

@ -164,6 +164,25 @@ wcwidth = "*"
checkqa-mypy = ["mypy (==v0.761)"]
testing = ["argcomplete", "hypothesis (>=3.56)", "mock", "nose", "requests", "xmlschema"]
[[package]]
name = "python-xlib"
version = "0.31"
description = "Python X Library"
category = "main"
optional = false
python-versions = "*"
[package.dependencies]
six = ">=1.10.0"
[[package]]
name = "six"
version = "1.16.0"
description = "Python 2 and 3 compatibility utilities"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*"
[[package]]
name = "tomli"
version = "1.2.3"
@ -190,8 +209,8 @@ python-versions = "*"
[metadata]
lock-version = "1.1"
python-versions = "^3.9"
content-hash = "964d715983d5a8a4b28ae866a9b883c3e38fb493a3b8722107d760fac875d999"
python-versions = ">=3.9,<3.11"
content-hash = "cd2992975cbf0dc139f3a59fb907529adc5c15de7cb62f05a3fa62145b0bd8d3"
[metadata.files]
atomicwrites = [
@ -250,6 +269,14 @@ pytest = [
{file = "pytest-5.4.3-py3-none-any.whl", hash = "sha256:5c0db86b698e8f170ba4582a492248919255fcd4c79b1ee64ace34301fb589a1"},
{file = "pytest-5.4.3.tar.gz", hash = "sha256:7979331bfcba207414f5e1263b5a0f8f521d0f457318836a7355531ed1a4c7d8"},
]
python-xlib = [
{file = "python-xlib-0.31.tar.gz", hash = "sha256:74d83a081f532bc07f6d7afcd6416ec38403d68f68b9b9dc9e1f28fbf2d799e9"},
{file = "python_xlib-0.31-py2.py3-none-any.whl", hash = "sha256:1ec6ce0de73d9e6592ead666779a5732b384e5b8fb1f1886bd0a81cafa477759"},
]
six = [
{file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"},
{file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"},
]
tomli = [
{file = "tomli-1.2.3-py3-none-any.whl", hash = "sha256:e3069e4be3ead9668e21cb9b074cd948f7b3113fd9c8bba083f48247aab8b11c"},
{file = "tomli-1.2.3.tar.gz", hash = "sha256:05b6166bff487dc068d322585c7ea4ef78deed501cc124060e0f238e89a9231f"},

View File

@ -1,11 +1,12 @@
[tool.poetry]
name = "clipboard-watcher"
version = "0.1.0"
description = ""
version = "0.0.1"
description = "Monitors the access of other processes to the clipboard contents."
authors = ["Gonçalo Valério <gon@ovalerio.net>"]
[tool.poetry.dependencies]
python = "^3.9"
python = ">=3.9,<3.11"
python-xlib = "^0.31"
[tool.poetry.dev-dependencies]
pytest = "^5.2"