Skip to main content

dada_lsp_server/lsp/
dispatch.rs

1use std::{
2    marker::PhantomData,
3    ops::ControlFlow,
4    sync::{Arc, mpsc::Sender},
5    thread::Scope,
6};
7
8use dada_util::Fallible;
9use lsp_server::{Connection, Message, Notification, Request, Response};
10use lsp_types::{PublishDiagnosticsParams, notification, request};
11
12use super::{Editor, Lsp};
13
14pub(super) struct LspDispatch<'l, L: Lsp + 'l> {
15    connection: Arc<Connection>,
16    lsp: L,
17    notification_arms: Vec<Box<dyn NotificationArm<L> + 'l>>,
18    request_arms: Vec<Box<dyn RequestArm<L> + 'l>>,
19}
20
21trait NotificationArm<L> {
22    fn execute(
23        &self,
24        context: &mut L,
25        editor: &mut dyn Editor<L>,
26        notification: Notification,
27    ) -> Fallible<ControlFlow<(), Notification>>;
28}
29
30trait RequestArm<L> {
31    fn execute(
32        &self,
33        context: &mut L,
34        editor: &mut dyn Editor<L>,
35        request: Request,
36    ) -> Fallible<ControlFlow<Response, Request>>;
37}
38
39impl<'l, L: Lsp + 'l> LspDispatch<'l, L> {
40    pub fn new(connection: Connection, lsp: L) -> Self {
41        Self {
42            lsp,
43            connection: Arc::new(connection),
44            notification_arms: vec![],
45            request_arms: vec![],
46        }
47    }
48
49    pub fn on_notification<N>(
50        mut self,
51        execute: impl Fn(&mut L, &mut dyn Editor<L>, N::Params) -> Fallible<()> + 'l,
52    ) -> Self
53    where
54        N: notification::Notification + 'l,
55    {
56        struct NotificationArmImpl<N, F, L> {
57            notification: PhantomData<(N, L)>,
58            execute: F,
59        }
60
61        impl<L, N, F> NotificationArm<L> for NotificationArmImpl<N, F, L>
62        where
63            N: notification::Notification,
64            F: Fn(&mut L, &mut dyn Editor<L>, N::Params) -> Fallible<()>,
65        {
66            fn execute(
67                &self,
68                lsp: &mut L,
69                editor: &mut dyn Editor<L>,
70                notification: Notification,
71            ) -> Fallible<ControlFlow<(), Notification>> {
72                if notification.method != N::METHOD {
73                    return Ok(ControlFlow::Continue(notification));
74                }
75
76                let params: N::Params = serde_json::from_value(notification.params)?;
77                (self.execute)(lsp, editor, params)?;
78
79                Ok(ControlFlow::Break(()))
80            }
81        }
82
83        self.notification_arms.push(Box::new(NotificationArmImpl {
84            notification: PhantomData::<(N, L)>,
85            execute,
86        }));
87
88        self
89    }
90
91    pub fn on_request<R>(
92        mut self,
93        execute: impl Fn(&mut L, &mut dyn Editor<L>, R::Params) -> Fallible<R::Result> + 'l,
94    ) -> Self
95    where
96        R: request::Request + 'l,
97    {
98        struct RequestArmImpl<R, F, L> {
99            request: PhantomData<(R, L)>,
100            execute: F,
101        }
102
103        impl<L, R, F> RequestArm<L> for RequestArmImpl<R, F, L>
104        where
105            R: request::Request,
106            F: Fn(&mut L, &mut dyn Editor<L>, R::Params) -> Fallible<R::Result>,
107        {
108            fn execute(
109                &self,
110                lsp: &mut L,
111                editor: &mut dyn Editor<L>,
112                request: Request,
113            ) -> Fallible<ControlFlow<Response, Request>> {
114                if request.method != R::METHOD {
115                    return Ok(ControlFlow::Continue(request));
116                }
117
118                let params: R::Params = serde_json::from_value(request.params)?;
119                let result = (self.execute)(lsp, editor, params)?;
120                let response = Response {
121                    id: request.id,
122                    result: Some(serde_json::to_value(result)?),
123                    error: None,
124                };
125
126                Ok(ControlFlow::Break(response))
127            }
128        }
129
130        self.request_arms.push(Box::new(RequestArmImpl {
131            request: PhantomData::<(R, L)>,
132            execute,
133        }));
134
135        self
136    }
137
138    /// Start receiving and dispatch messages. Blocks until a shutdown request is received.
139    pub fn execute(mut self) -> Fallible<()> {
140        let (spawned_tasks_tx, spawned_tasks_rx) = std::sync::mpsc::channel::<SpawnedTask<L>>();
141        let (errors_tx, errors_rx) = std::sync::mpsc::channel::<dada_util::Error>();
142        let connection = self.connection.clone();
143        std::thread::scope(|scope| {
144            for message in &connection.receiver {
145                // Check for shutdown requests:
146                if let Message::Request(req) = &message
147                    && self.connection.handle_shutdown(req)?
148                {
149                    break;
150                }
151
152                // Otherwise:
153                self.receive(scope, spawned_tasks_tx.clone(), message)?;
154
155                while let Ok(task) = spawned_tasks_rx.try_recv() {
156                    scope.spawn({
157                        let fork: <L as Lsp>::Fork = self.lsp.fork();
158                        let spawned_tasks_tx = spawned_tasks_tx.clone();
159                        let errors_tx = errors_tx.clone();
160                        let connection = &connection;
161                        move || {
162                            let mut editor = LspDispatchEditor {
163                                connection,
164                                spawned_tasks_tx,
165                            };
166                            match (task.task)(&fork, &mut editor) {
167                                Ok(()) => (),
168                                Err(err) => errors_tx.send(err).unwrap(),
169                            }
170                        }
171                    });
172                }
173
174                if let Ok(err) = errors_rx.try_recv() {
175                    return Err(err);
176                }
177            }
178
179            Ok(())
180        })
181    }
182
183    /// Given a message, find the handler (if any) and invoke it.
184    fn receive(
185        &mut self,
186        _scope: &Scope<'_, '_>,
187        spawned_tasks_tx: Sender<SpawnedTask<L>>,
188        message: Message,
189    ) -> Fallible<()> {
190        match message {
191            Message::Request(request) => {
192                let mut editor = LspDispatchEditor {
193                    connection: &self.connection,
194                    spawned_tasks_tx,
195                };
196
197                let mut req = request;
198                for arm in &self.request_arms {
199                    match arm.execute(&mut self.lsp, &mut editor, req)? {
200                        ControlFlow::Break(response) => {
201                            self.connection.sender.send(Message::Response(response))?;
202                            return Ok(());
203                        }
204                        ControlFlow::Continue(r) => req = r,
205                    }
206                }
207
208                // If we get here, no handler was found
209                let response = Response {
210                    id: req.id,
211                    result: None,
212                    error: Some(lsp_server::ResponseError {
213                        code: lsp_server::ErrorCode::MethodNotFound as i32,
214                        message: format!("Method not found: {}", req.method),
215                        data: None,
216                    }),
217                };
218                self.connection.sender.send(Message::Response(response))?;
219                Ok(())
220            }
221            Message::Response(_response) => Ok(()),
222            Message::Notification(mut notification) => {
223                let mut editor = LspDispatchEditor {
224                    connection: &self.connection,
225                    spawned_tasks_tx,
226                };
227                for arm in &self.notification_arms {
228                    match arm.execute(&mut self.lsp, &mut editor, notification)? {
229                        ControlFlow::Break(()) => break,
230                        ControlFlow::Continue(n) => notification = n,
231                    }
232                }
233                Ok(())
234            }
235        }
236    }
237}
238
239struct LspDispatchEditor<'scope, L: Lsp> {
240    connection: &'scope Connection,
241    spawned_tasks_tx: Sender<SpawnedTask<L>>,
242}
243
244impl<L: Lsp> LspDispatchEditor<'_, L> {
245    fn send_notification<N>(&self, params: N::Params) -> Fallible<()>
246    where
247        N: notification::Notification,
248    {
249        self.connection
250            .sender
251            .send(Message::Notification(Notification::new(
252                N::METHOD.to_string(),
253                params,
254            )))?;
255        Ok(())
256    }
257}
258
259impl<L: Lsp> Editor<L> for LspDispatchEditor<'_, L> {
260    fn show_message(
261        &mut self,
262        message_type: lsp_types::MessageType,
263        message: String,
264    ) -> Fallible<()> {
265        let params = lsp_types::ShowMessageParams {
266            typ: message_type,
267            message,
268        };
269
270        self.send_notification::<notification::ShowMessage>(params)?;
271
272        Ok(())
273    }
274
275    fn publish_diagnostics(&mut self, params: PublishDiagnosticsParams) -> Fallible<()> {
276        self.send_notification::<notification::PublishDiagnostics>(params)
277    }
278
279    fn spawn(
280        &mut self,
281        task: Box<dyn FnOnce(&<L as Lsp>::Fork, &mut dyn Editor<L>) -> Fallible<()> + Send>,
282    ) {
283        self.spawned_tasks_tx.send(SpawnedTask { task }).unwrap();
284    }
285}
286
287struct SpawnedTask<L: Lsp> {
288    #[allow(clippy::type_complexity)]
289    task: Box<dyn FnOnce(&<L as Lsp>::Fork, &mut dyn Editor<L>) -> Fallible<()> + Send>,
290}
291
292impl<L: Lsp> SpawnedTask<L> {}