Summary: fix, another bug 131 attempt, dispatch the
[sxemacs] / src / events / worker-asyneq.c
1 /*** worker-asyneq.c -- worker threads for asyneq feature
2  *
3  * Copyright (C) 2006-2008  Sebastian Freundt
4  *
5  * Author:  Sebastian Freundt <hroptatyr@sxemacs.org>
6  *
7  * This file is part of SXEmacs.
8  * 
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  *
13  * 1. Redistributions of source code must retain the above copyright
14  *    notice, this list of conditions and the following disclaimer.
15  *
16  * 2. Redistributions in binary form must reproduce the above copyright
17  *    notice, this list of conditions and the following disclaimer in the
18  *    documentation and/or other materials provided with the distribution.
19  *
20  * 3. Neither the name of the author nor the names of any contributors
21  *    may be used to endorse or promote products derived from this
22  *    software without specific prior written permission.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR
25  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
26  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
27  * DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
28  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
31  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
32  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
33  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
34  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35  *
36  ***/
37
38 /* Synched up with: Not in FSF. */
39
40 #include <config.h>
41 #include "lisp.h"
42 #include "syssignal.h"
43 #include "worker-asyneq.h"
44 #define INCLUDE_EVENTS_H_PRIVATE_SPHERE
45 #include "events.h"
46
47 event_queue_t delegate_eq = Qnull_pointer;
48 static Lisp_Object Vdelegate_eq;
49 static void eq_worker_th_blksig(void);
50
51 static struct work_handler_s eat_yerself = {
52         NULL,                   /* markfun */
53         NULL,                   /* printer */
54         NULL                    /* finaliser */
55 };
56
57 \f
58 void
59 init_workers(int nthreads, sxe_thread_f handler)
60 {
61         pthread_attr_t attr;
62         int i;
63
64         pthread_attr_init(&attr);
65         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
66
67         for (i=0; i < nthreads; i++) {
68                 eq_worker_t eqw = eq_make_worker();
69                 /* value taken from SOUND_MAX_AUDIO_FRAME_SIZE */
70                 resize_worker_scratch(eqw, 48000*6*sizeof(uint32_t));
71                 xthread_create(
72                         &eq_worker_thread(eqw), &attr, handler, eqw);
73                 dllist_append(workers, eqw);
74         }
75
76         pthread_attr_destroy(&attr);
77         return;
78 }
79
80 void
81 fini_worker(eq_worker_t eqw)
82 {
83         xthread_join(eq_worker_thread(eqw), NULL);
84         SXE_MUTEX_UNLOCK(&eq_worker_mtx(eqw));
85         eq_free_worker(eqw);
86 }
87
88 \f
89 extern event_queue_t asyneq;
90
91 void
92 eq_worker_eaten_myself(eq_worker_t eqw)
93 {
94         Lisp_Object emev = Qnil;
95         struct gcpro gcpro1;
96
97         GCPRO1(emev);
98         emev = make_empty_event();
99         XEVENT(emev)->event_type = eaten_myself_event;
100         XEVENT(emev)->event.eaten_myself.worker = eqw;
101         eq_enqueue(asyneq, emev);
102         UNGCPRO;
103         return;
104 }
105
106 void
107 eq_worker_work_started(Lisp_Object job)
108 {
109         Lisp_Object wsev = Qnil;
110         struct gcpro gcpro1;
111
112         GCPRO1(wsev);
113         wsev = make_empty_event();
114         XEVENT(wsev)->event_type = work_started_event;
115         XEVENT(wsev)->event.work_started.job = job;
116         eq_enqueue(asyneq, wsev);
117         UNGCPRO;
118         return;
119 }
120
121 void
122 eq_worker_work_finished(Lisp_Object job)
123 {
124         Lisp_Object wfev = Qnil;
125         struct gcpro gcpro1;
126
127         GCPRO1(wfev);
128         wfev = make_empty_event();
129         XEVENT(wfev)->event_type = work_finished_event;
130         XEVENT(wfev)->event.work_finished.job = job;
131 #if 0
132         /* the original idea was to enqueue the event and let the dispatcher
133          * pick it up at some point in a FIFO fashion
134          * however that's gone up the drain with bug #131 ... */
135         eq_enqueue(asyneq, wfev);
136         asyneq_handle_event(wfev);
137 #else  /* !0 */
138         /* ... so let's try and dispatch the event directly
139          * while he's gc pro'd of course */
140         Fdispatch_event(wfev);
141 #endif  /* 0 */
142         UNGCPRO;
143         return;
144 }
145
146 void
147 eq_delegate_work(event_queue_t eq)
148 {
149         int cur = eq_queue_size(eq);
150         while (cur--) { 
151                 eq_queue_trigger(eq);
152         }
153         return;
154 }
155
156 \f
157 static void
158 eq_worker_th_blksig(void)
159 {
160         EMACS_BLOCK_SIGNAL(SIGINT);     /* ANSI */
161         EMACS_BLOCK_SIGNAL(SIGILL);     /* ANSI */
162         EMACS_BLOCK_SIGNAL(SIGABRT);    /* ANSI */
163         EMACS_BLOCK_SIGNAL(SIGFPE);     /* ANSI */
164         EMACS_BLOCK_SIGNAL(SIGSEGV);    /* ANSI */
165         EMACS_BLOCK_SIGNAL(SIGTERM);    /* ANSI */
166         
167 #if defined SIGHUP
168         EMACS_BLOCK_SIGNAL(SIGHUP);     /* POSIX */
169 #endif
170 #if defined SIGQUIT
171         EMACS_BLOCK_SIGNAL(SIGQUIT);    /* POSIX */
172 #endif
173 #if defined SIGTRAP
174         EMACS_BLOCK_SIGNAL(SIGTRAP);    /* POSIX */
175 #endif
176 #if defined SIGUSR1
177         EMACS_BLOCK_SIGNAL(SIGUSR1);    /* POSIX */
178 #endif
179 #if defined SIGUSR2
180         EMACS_BLOCK_SIGNAL(SIGUSR2);    /* POSIX */
181 #endif
182 #if defined SIGPIPE
183         EMACS_BLOCK_SIGNAL(SIGPIPE);    /* POSIX */
184 #endif
185 #if defined SIGALRM
186         EMACS_BLOCK_SIGNAL(SIGALRM);    /* POSIX */
187 #endif
188 #if defined SIGCHLD
189         EMACS_BLOCK_SIGNAL(SIGCHLD);    /* POSIX */
190 #endif
191 #if defined SIGCONT
192         EMACS_BLOCK_SIGNAL(SIGCONT);    /* POSIX */
193 #endif
194 #if defined SIGSTOP
195         EMACS_BLOCK_SIGNAL(SIGSTOP);    /* POSIX */
196 #endif
197 #if defined SIGTSTP
198         EMACS_BLOCK_SIGNAL(SIGTSTP);    /* POSIX */
199 #endif
200 #if defined SIGTTIN
201         EMACS_BLOCK_SIGNAL(SIGTTIN);    /* POSIX */
202 #endif
203 #if defined SIGTTOU
204         EMACS_BLOCK_SIGNAL(SIGTTOU);    /* POSIX */
205 #endif
206
207 #if defined SIGBUS
208         EMACS_BLOCK_SIGNAL(SIGBUS);     /* XPG5 */
209 #endif
210 #if defined SIGPOLL
211         EMACS_BLOCK_SIGNAL(SIGPOLL);    /* XPG5 */
212 #endif
213 #if defined SIGPROF
214         EMACS_BLOCK_SIGNAL(SIGPROF);    /* XPG5 */
215 #endif
216 #if defined SIGSYS
217         EMACS_BLOCK_SIGNAL(SIGSYS);     /* XPG5 */
218 #endif
219 #if defined SIGURG
220         EMACS_BLOCK_SIGNAL(SIGURG);     /* XPG5 */
221 #endif
222 #if defined SIGXCPU
223         EMACS_BLOCK_SIGNAL(SIGXCPU);    /* XPG5 */
224 #endif
225 #if defined SIGXFSZ
226         EMACS_BLOCK_SIGNAL(SIGXFSZ);    /* XPG5 */
227 #endif
228 #if defined SIGVTALRM
229         EMACS_BLOCK_SIGNAL(SIGVTALRM);  /* XPG5 */
230 #endif
231
232 #if defined SIGIO
233         EMACS_BLOCK_SIGNAL(SIGIO);      /* BSD 4.2 */
234 #endif
235 #if defined SIGWINCH
236         EMACS_BLOCK_SIGNAL(SIGWINCH);   /* BSD 4.3 */
237 #endif
238
239 #if defined SIGEMT
240         EMACS_BLOCK_SIGNAL(SIGEMT);
241 #endif
242 #if defined SIGINFO
243         EMACS_BLOCK_SIGNAL(SIGINFO);
244 #endif
245 #if defined SIGHWE
246         EMACS_BLOCK_SIGNAL(SIGHWE);
247 #endif
248 #if defined SIGPRE
249         EMACS_BLOCK_SIGNAL(SIGPRE);
250 #endif
251 #if defined SIGUME
252         EMACS_BLOCK_SIGNAL(SIGUME);
253 #endif
254 #if defined SIGDLK
255         EMACS_BLOCK_SIGNAL(SIGDLK);
256 #endif
257 #if defined SIGCPULIM
258         EMACS_BLOCK_SIGNAL(SIGCPULIM);
259 #endif
260 #if defined SIGIOT
261         EMACS_BLOCK_SIGNAL(SIGIOT);
262 #endif
263 #if defined SIGLOST
264 # if !defined HAVE_BDWGC || !defined EF_USE_BDWGC
265         EMACS_BLOCK_SIGNAL(SIGLOST);
266 # endif  /* BDWGC case */
267 #endif
268 #if defined SIGSTKFLT
269         EMACS_BLOCK_SIGNAL(SIGSTKFLT);
270 #endif
271 #if defined SIGUNUSED
272         EMACS_BLOCK_SIGNAL(SIGUNUSED);
273 #endif
274 #if defined SIGDANGER
275         EMACS_BLOCK_SIGNAL(SIGDANGER);  /* AIX */
276 #endif
277 #if defined SIGMSG
278         EMACS_BLOCK_SIGNAL(SIGMSG);
279 #endif
280 #if defined SIGSOUND
281         EMACS_BLOCK_SIGNAL(SIGSOUND);
282 #endif
283 #if defined SIGRETRACT
284         EMACS_BLOCK_SIGNAL(SIGRETRACT);
285 #endif
286 #if defined SIGGRANT
287         EMACS_BLOCK_SIGNAL(SIGGRANT);
288 #endif
289 #if defined SIGPWR
290 # if !defined HAVE_BDWGC || !defined EF_USE_BDWGC
291         EMACS_BLOCK_SIGNAL(SIGPWR);
292 # endif  /* BDWGC case */
293 #endif
294 }
295
296 static void *
297 eq_worker_th(void *eqwptr)
298 {
299         Lisp_Object ljob = Qnil;
300         worker_job_t job = NULL;
301         eq_worker_t eqw = eqwptr;
302         work_handler_t hdl;
303         struct gcpro gcpro1;
304
305         eq_worker_th_blksig();
306
307         GCPRO1(ljob);
308 listen:
309         eq_queue_synch(delegate_eq);
310
311         EQUEUE_DEBUG_WORKER("dequeuing thread: 0x%lx\n",
312                             (long unsigned int)pthread_self());
313
314         /* fetch one event now */
315 refetch:
316         ljob = Qnil;
317         eq_dequeue_pro(&ljob, delegate_eq);
318         if (NILP(ljob)) {
319                 EQUEUE_DEBUG_WORKER("No event on the queue. "
320                                     "Who dared to wake me up?! >8(\n");
321                 goto listen;
322         }
323
324         eq_lock_meself(eqw);
325         job = XWORKER_JOB(ljob);
326         hdl = XWORKER_JOB_HANDLER(ljob);
327         EQUEUE_DEBUG_WORKER("escrowing event 0x%lx in worker 0x%lx.\n",
328                             (long unsigned int)job,
329                             (long unsigned int)eqw);
330
331         /* maybe it's a eat-yourself ticket? */
332         if (hdl == &eat_yerself) {
333                 /* awww ... we gotta exit :( */
334                 EQUEUE_DEBUG_WORKER(
335                         "Worker 0x%lx commits suicide...\n",
336                         (long unsigned int)eqw);
337                 eq_unlock_meself(eqw);
338                 eq_worker_eaten_myself(eqw);
339                 UNGCPRO;
340                 pthread_exit(NULL);
341                 return NULL;
342         }
343
344         /* help the job a bit with local resources */
345         EQUEUE_DEBUG_SCRATCH("inherit scratch buffer 0x%lx of size %ld\n",
346                              (long unsigned int)eq_worker_scratch(eqw),
347                              eq_worker_scratch_alloc_size(eqw));
348         worker_job_buffer(job) = eq_worker_scratch(eqw);
349         worker_job_buffer_alloc_size(job) = eq_worker_scratch_alloc_size(eqw);
350
351         /* generate a started event and update job state */
352         worker_job_state(job) = WORKER_JOB_RUNNING;
353         eq_worker_work_started(ljob);
354
355         /* ... otherwise handle the event */
356         work_handler(hdl)(job);
357
358         /* generate a `finished' event,
359          * sentinel code shall be injected in the routine
360          * called by eq_worker_handle_event() */
361         worker_job_state(job) = WORKER_JOB_FINISHED;
362         eq_worker_work_finished(ljob);
363
364         eq_unlock_meself(eqw);
365
366         EQUEUE_DEBUG_WORKER("enqueuing thread: 0x%lx\n",
367                             (long unsigned int)pthread_self());
368         goto refetch;
369         /* not reached */
370         return NULL;
371 }
372
373 DEFUN("init-workers", Finit_workers, 1, 1, 0, /*
374 Initialise NUMBER-OF-WORKERS worker threads.
375 If called repeatedly this function does NOT add more workers
376 use `add-workers' instead.
377 */
378       (number_of_workers))
379 {
380         CHECK_NATNUM(number_of_workers);
381         if (dllist_get_size(workers) <= 1) {
382                 init_workers(XINT(number_of_workers), eq_worker_th);
383         }
384         return Qt;
385 }
386
387
388 DEFUN("add-workers", Fadd_workers, 1, 1, 0, /*
389 Add NUMBER-OF-WORKERS worker threads.
390 */
391       (number_of_workers))
392 {
393         CHECK_NATNUM(number_of_workers);
394         init_workers(XINT(number_of_workers), eq_worker_th);
395         return Qt;
396 }
397
398 DEFUN("remove-workers", Fremove_workers, 0, 1, 0, /*
399 Stop NUMBER-OF-WORKERS worker threads.  By default stop all.
400 Depending on whether there are busy this operation may block the
401 main execution loop until all worker threads are non-busy.
402 */
403       (number_of_workers))
404 {
405         Lisp_Object job = Qnil;
406         int i, noev = 0;        /* how many eat_yerself events to send? */
407         struct gcpro gcpro1;
408
409         if (NILP(number_of_workers)) {
410                 noev = dllist_get_size(workers)-1;
411         } else {
412                 CHECK_NATNUM(number_of_workers);
413                 noev = XINT(number_of_workers);
414         }
415
416         GCPRO1(job);
417         for (i = 0; i < noev; i++) {
418                 job = wrap_object(make_worker_job(&eat_yerself));
419                 eq_enqueue(delegate_eq, job);
420         }
421         eq_queue_trigger_all(delegate_eq);
422         UNGCPRO;
423         return job;
424 }
425
426 DEFUN("trigger-workers", Ftrigger_workers, 0, 0, 0, /*
427 Trigger all worker threads.
428 */
429       ())
430 {
431         eq_queue_trigger_all(delegate_eq);
432         return Qt;
433 }
434
435 DEFUN("running-workers", Frunning_workers, 0, 0, 0, /*
436 Return the number of currently running worker threads,
437 the main thread excluded.
438 */
439       ())
440 {
441         return make_int(dllist_get_size(workers)-1);
442 }
443
444 \f
445 void syms_of_worker_asyneq(void)
446 {
447         DEFSUBR(Finit_workers);
448         DEFSUBR(Fadd_workers);
449         DEFSUBR(Fremove_workers);
450         DEFSUBR(Ftrigger_workers);
451         DEFSUBR(Frunning_workers);
452 }
453
454 void reinit_vars_of_worker_asyneq(void)
455 {
456         /* the delegate queue in case of multiple threads */
457         delegate_eq = make_event_queue();
458         XSETEVENT_QUEUE(Vdelegate_eq, delegate_eq);
459         staticpro_nodump(&Vdelegate_eq);
460 }
461
462 void vars_of_worker_asyneq(void)
463 {
464         Fprovide(intern("asyneq"));
465 }
466
467 /* workers.c ends here */