-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtalon_command_server.py
More file actions
218 lines (188 loc) · 7.38 KB
/
talon_command_server.py
File metadata and controls
218 lines (188 loc) · 7.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# This is a naive port of pokey's typescript Talon command server, which was
# originally written for VSCode https://github.com/pokey/command-server
#
# This is designed to be usable generically by any python extension that needs
# to act as a command server. See documentation for API usage.
#
# TODO:
# - How do we deal with multiple instances of the same app?
import pathlib
import stat
import os
import json
import datetime
import time
import threading
import queue
class TalonCommandServer:
REQUEST_PATH = "request.json"
RESPONSE_PATH = "response.json"
STALE_TIMEOUT_MS = 60000
COMMAND_TIMEOUT_MS = 3000
def __init__(self, path):
suffix = ""
if hasattr(os, "getuid"):
suffix = f"-{os.getuid()}"
path = f"{path}{suffix}"
self.communication_directory = pathlib.Path(path)
self.request_file = self.communication_directory / self.REQUEST_PATH
self.response_file = self.communication_directory / self.RESPONSE_PATH
result = self.initialize_communication_dir()
if not result:
print("ERROR: Unable to initialize communication directory")
self.init_ok = False
return
self.init_ok = True
def read_request(self):
"""Reads the JSON-encoded request from the request file
The request file will be unlinked after being read.
A request is formatted as:
commandId: string;
- The id of the command to run
uuid: string;
- A uuid that will be written to the response file for sanity
checking client-side
args: list;
- Arguments to the command, if any
returnCommandOutput: boolean;
- A boolean indicating if we should return the output of the command
waitForFinish: boolean;
- A boolean indicating if we should await the command to ensure it
is complete. This behaviour is desirable for some commands and
not others. For most commands it is ok, and can remove race
conditions, but for some commands, such as ones that show a quick
picker, it can hang the client
Returns parsed request or None
"""
timestamp = self.request_file.stat().st_mtime
current = datetime.datetime.now().timestamp()
request = None
if int(current - timestamp) * 1000 > self.COMMAND_TIMEOUT_MS:
print("WARNING: Request is stale. Will delete.")
else:
if self.request_file.stat()[stat.ST_SIZE] == 0:
return None
with self.request_file.open("r") as request:
try:
request = json.load(request)
except json.decoder.JSONDecodeError:
return None
self.request_file.unlink()
return request
def write_response(self, response):
"""Write JSON-encoded response to request file"""
# XXX - Cursorless uses wx, which fails if the file exists...
with open(self.response_file, "w+") as f:
f.write(json.dumps(response) + "\n")
def initialize_communication_dir(self):
"""Initialize the RPC directory"""
path = self.communication_directory
path.mkdir(mode=0o770, parents=True, exist_ok=True)
# Basic sanity validation
stats = path.stat()
if (
not path.is_dir()
or path.is_symlink()
or stats.st_mode & stat.S_IWOTH
or (stats[stat.ST_UID] >= 0 and stats[stat.ST_UID] != os.getuid())
):
print(
f"ERROR: Unable to create communication directory: {self.communication_directory}"
)
return False
return True
def validate_request(self, request):
"""Ensure that all of the required fields are in the request"""
required_fields = [
"commandId",
"args",
"uuid",
"returnCommandOutput",
"waitForFinish",
]
valid = True
for field in required_fields:
if field not in request.keys():
print(f"ERROR: request is missing required field {field}")
valid = False
print(request)
return valid
def command_thread(self, data_queue, request, command_handler):
"""A new thread to invoke the command handler"""
result = command_handler(request["commandId"], *request["args"])
if data_queue:
data_queue.put(result)
return
def run_command_threaded(self, request, handler, do_async):
"""Runs a command handler in a new thread
Optionally waits for the response if async is false"""
data_queue = None
if not do_async:
data_queue = queue.Queue()
t = threading.Thread(
target=self.command_thread, args=(data_queue, request, handler)
)
if do_async:
t.daemon = True
t.start()
return None
else:
t.start()
output = data_queue.get()
t.join()
return output
def handle_request(self, command_handler):
"""Handle reading and responding to a single request"""
error = None
warnings = None
if not self.request_file.exists():
return False
request = self.read_request()
if not request:
return False
if not self.validate_request(request):
print("WARNING: Received bad request. Ignoring")
return False
if request["returnCommandOutput"] or request["waitForFinish"]:
output = command_handler(request["commandId"], *request["args"])
# XXX - Need to prep error and warning somehow
if not request["returnCommandOutput"]:
output = None
self.send_response(output, request["uuid"], error, warnings)
else:
# Send the response first since we may block on run_command
self.send_response(None, request["uuid"], error, warnings)
command_handler(request["commandId"], *request["args"])
return
def send_response(self, output, uuid, warnings, error):
response = {}
response["returnValue"] = output
response["error"] = error
response["uuid"] = uuid
response["warnings"] = warnings
self.write_response(response)
def command_loop(self, command_handler):
"""Loop indefinitely waiting for new commands"""
while True:
if not self.request_file.exists():
time.sleep(0.01)
continue
request = self.read_request()
if not request:
continue
if not self.validate_request(request):
print("WARNING: Received bad request. Ignoring")
continue
do_async = True
if request["returnCommandOutput"] or request["waitForFinish"]:
do_async = False
output = self.run_command(request, command_handler, do_async)
# XXX - Add proper error handling
error = None
warnings = None
response = {}
response["returnValue"] = output
response["error"] = error
response["uuid"] = request["uuid"]
response["warnings"] = warnings
self.write_response(response)