1use std::{
2 marker::PhantomData,
3 ops::ControlFlow,
4 sync::{Arc, mpsc::Sender},
5 thread::Scope,
6};
7
8use dada_util::Fallible;
9use lsp_server::{Connection, Message, Notification, Request, Response};
10use lsp_types::{PublishDiagnosticsParams, notification, request};
11
12use super::{Editor, Lsp};
13
14pub(super) struct LspDispatch<'l, L: Lsp + 'l> {
15 connection: Arc<Connection>,
16 lsp: L,
17 notification_arms: Vec<Box<dyn NotificationArm<L> + 'l>>,
18 request_arms: Vec<Box<dyn RequestArm<L> + 'l>>,
19}
20
21trait NotificationArm<L> {
22 fn execute(
23 &self,
24 context: &mut L,
25 editor: &mut dyn Editor<L>,
26 notification: Notification,
27 ) -> Fallible<ControlFlow<(), Notification>>;
28}
29
30trait RequestArm<L> {
31 fn execute(
32 &self,
33 context: &mut L,
34 editor: &mut dyn Editor<L>,
35 request: Request,
36 ) -> Fallible<ControlFlow<Response, Request>>;
37}
38
39impl<'l, L: Lsp + 'l> LspDispatch<'l, L> {
40 pub fn new(connection: Connection, lsp: L) -> Self {
41 Self {
42 lsp,
43 connection: Arc::new(connection),
44 notification_arms: vec![],
45 request_arms: vec![],
46 }
47 }
48
49 pub fn on_notification<N>(
50 mut self,
51 execute: impl Fn(&mut L, &mut dyn Editor<L>, N::Params) -> Fallible<()> + 'l,
52 ) -> Self
53 where
54 N: notification::Notification + 'l,
55 {
56 struct NotificationArmImpl<N, F, L> {
57 notification: PhantomData<(N, L)>,
58 execute: F,
59 }
60
61 impl<L, N, F> NotificationArm<L> for NotificationArmImpl<N, F, L>
62 where
63 N: notification::Notification,
64 F: Fn(&mut L, &mut dyn Editor<L>, N::Params) -> Fallible<()>,
65 {
66 fn execute(
67 &self,
68 lsp: &mut L,
69 editor: &mut dyn Editor<L>,
70 notification: Notification,
71 ) -> Fallible<ControlFlow<(), Notification>> {
72 if notification.method != N::METHOD {
73 return Ok(ControlFlow::Continue(notification));
74 }
75
76 let params: N::Params = serde_json::from_value(notification.params)?;
77 (self.execute)(lsp, editor, params)?;
78
79 Ok(ControlFlow::Break(()))
80 }
81 }
82
83 self.notification_arms.push(Box::new(NotificationArmImpl {
84 notification: PhantomData::<(N, L)>,
85 execute,
86 }));
87
88 self
89 }
90
91 pub fn on_request<R>(
92 mut self,
93 execute: impl Fn(&mut L, &mut dyn Editor<L>, R::Params) -> Fallible<R::Result> + 'l,
94 ) -> Self
95 where
96 R: request::Request + 'l,
97 {
98 struct RequestArmImpl<R, F, L> {
99 request: PhantomData<(R, L)>,
100 execute: F,
101 }
102
103 impl<L, R, F> RequestArm<L> for RequestArmImpl<R, F, L>
104 where
105 R: request::Request,
106 F: Fn(&mut L, &mut dyn Editor<L>, R::Params) -> Fallible<R::Result>,
107 {
108 fn execute(
109 &self,
110 lsp: &mut L,
111 editor: &mut dyn Editor<L>,
112 request: Request,
113 ) -> Fallible<ControlFlow<Response, Request>> {
114 if request.method != R::METHOD {
115 return Ok(ControlFlow::Continue(request));
116 }
117
118 let params: R::Params = serde_json::from_value(request.params)?;
119 let result = (self.execute)(lsp, editor, params)?;
120 let response = Response {
121 id: request.id,
122 result: Some(serde_json::to_value(result)?),
123 error: None,
124 };
125
126 Ok(ControlFlow::Break(response))
127 }
128 }
129
130 self.request_arms.push(Box::new(RequestArmImpl {
131 request: PhantomData::<(R, L)>,
132 execute,
133 }));
134
135 self
136 }
137
138 pub fn execute(mut self) -> Fallible<()> {
140 let (spawned_tasks_tx, spawned_tasks_rx) = std::sync::mpsc::channel::<SpawnedTask<L>>();
141 let (errors_tx, errors_rx) = std::sync::mpsc::channel::<dada_util::Error>();
142 let connection = self.connection.clone();
143 std::thread::scope(|scope| {
144 for message in &connection.receiver {
145 if let Message::Request(req) = &message
147 && self.connection.handle_shutdown(req)?
148 {
149 break;
150 }
151
152 self.receive(scope, spawned_tasks_tx.clone(), message)?;
154
155 while let Ok(task) = spawned_tasks_rx.try_recv() {
156 scope.spawn({
157 let fork: <L as Lsp>::Fork = self.lsp.fork();
158 let spawned_tasks_tx = spawned_tasks_tx.clone();
159 let errors_tx = errors_tx.clone();
160 let connection = &connection;
161 move || {
162 let mut editor = LspDispatchEditor {
163 connection,
164 spawned_tasks_tx,
165 };
166 match (task.task)(&fork, &mut editor) {
167 Ok(()) => (),
168 Err(err) => errors_tx.send(err).unwrap(),
169 }
170 }
171 });
172 }
173
174 if let Ok(err) = errors_rx.try_recv() {
175 return Err(err);
176 }
177 }
178
179 Ok(())
180 })
181 }
182
183 fn receive(
185 &mut self,
186 _scope: &Scope<'_, '_>,
187 spawned_tasks_tx: Sender<SpawnedTask<L>>,
188 message: Message,
189 ) -> Fallible<()> {
190 match message {
191 Message::Request(request) => {
192 let mut editor = LspDispatchEditor {
193 connection: &self.connection,
194 spawned_tasks_tx,
195 };
196
197 let mut req = request;
198 for arm in &self.request_arms {
199 match arm.execute(&mut self.lsp, &mut editor, req)? {
200 ControlFlow::Break(response) => {
201 self.connection.sender.send(Message::Response(response))?;
202 return Ok(());
203 }
204 ControlFlow::Continue(r) => req = r,
205 }
206 }
207
208 let response = Response {
210 id: req.id,
211 result: None,
212 error: Some(lsp_server::ResponseError {
213 code: lsp_server::ErrorCode::MethodNotFound as i32,
214 message: format!("Method not found: {}", req.method),
215 data: None,
216 }),
217 };
218 self.connection.sender.send(Message::Response(response))?;
219 Ok(())
220 }
221 Message::Response(_response) => Ok(()),
222 Message::Notification(mut notification) => {
223 let mut editor = LspDispatchEditor {
224 connection: &self.connection,
225 spawned_tasks_tx,
226 };
227 for arm in &self.notification_arms {
228 match arm.execute(&mut self.lsp, &mut editor, notification)? {
229 ControlFlow::Break(()) => break,
230 ControlFlow::Continue(n) => notification = n,
231 }
232 }
233 Ok(())
234 }
235 }
236 }
237}
238
239struct LspDispatchEditor<'scope, L: Lsp> {
240 connection: &'scope Connection,
241 spawned_tasks_tx: Sender<SpawnedTask<L>>,
242}
243
244impl<L: Lsp> LspDispatchEditor<'_, L> {
245 fn send_notification<N>(&self, params: N::Params) -> Fallible<()>
246 where
247 N: notification::Notification,
248 {
249 self.connection
250 .sender
251 .send(Message::Notification(Notification::new(
252 N::METHOD.to_string(),
253 params,
254 )))?;
255 Ok(())
256 }
257}
258
259impl<L: Lsp> Editor<L> for LspDispatchEditor<'_, L> {
260 fn show_message(
261 &mut self,
262 message_type: lsp_types::MessageType,
263 message: String,
264 ) -> Fallible<()> {
265 let params = lsp_types::ShowMessageParams {
266 typ: message_type,
267 message,
268 };
269
270 self.send_notification::<notification::ShowMessage>(params)?;
271
272 Ok(())
273 }
274
275 fn publish_diagnostics(&mut self, params: PublishDiagnosticsParams) -> Fallible<()> {
276 self.send_notification::<notification::PublishDiagnostics>(params)
277 }
278
279 fn spawn(
280 &mut self,
281 task: Box<dyn FnOnce(&<L as Lsp>::Fork, &mut dyn Editor<L>) -> Fallible<()> + Send>,
282 ) {
283 self.spawned_tasks_tx.send(SpawnedTask { task }).unwrap();
284 }
285}
286
287struct SpawnedTask<L: Lsp> {
288 #[allow(clippy::type_complexity)]
289 task: Box<dyn FnOnce(&<L as Lsp>::Fork, &mut dyn Editor<L>) -> Fallible<()> + Send>,
290}
291
292impl<L: Lsp> SpawnedTask<L> {}