Scippy

SCIP

Solving Constraint Integer Programs

tpi_tnycthrd.c
Go to the documentation of this file.
1/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2/* */
3/* This file is part of the program and library */
4/* SCIP --- Solving Constraint Integer Programs */
5/* */
6/* Copyright (c) 2002-2024 Zuse Institute Berlin (ZIB) */
7/* */
8/* Licensed under the Apache License, Version 2.0 (the "License"); */
9/* you may not use this file except in compliance with the License. */
10/* You may obtain a copy of the License at */
11/* */
12/* http://www.apache.org/licenses/LICENSE-2.0 */
13/* */
14/* Unless required by applicable law or agreed to in writing, software */
15/* distributed under the License is distributed on an "AS IS" BASIS, */
16/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
17/* See the License for the specific language governing permissions and */
18/* limitations under the License. */
19/* */
20/* You should have received a copy of the Apache-2.0 license */
21/* along with SCIP; see the file LICENSE. If not visit scipopt.org. */
22/* */
23/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
24
25/**@file tpi_tnycthrd.c
26 * @ingroup TASKINTERFACE
27 * @brief a TPI implementation using tinycthreads
28 * @author Stephen J. Maher
29 * @author Leona Gottwald
30 * @author Marc Pfetsch
31 */
32
33/*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
34
35#include "tpi/tpi.h"
37#include "tinycthread/tinycthread.h"
38#include "scip/pub_message.h"
39#include "scip/pub_misc.h"
40
41/* macros for direct access */
42
43/* lock */
44#define SCIPtnyInitLock(lock) ( mtx_init((lock), mtx_plain) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
45#define SCIPtnyDestroyLock(lock) ( mtx_destroy(lock) )
46#define SCIPtnyAcquireLock(lock) ( mtx_lock(lock) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
47#define SCIPtnyReleaseLock(lock) ( mtx_unlock(lock) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
48
49/* condition */
50#define SCIPtnyInitCondition(condition) ( cnd_init(condition) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
51#define SCIPtnyDestroyCondition(condition) ( cnd_destroy(condition) )
52#define SCIPtnySignalCondition(condition) ( cnd_signal(condition) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
53#define SCIPtnyBroadcastCondition(condition) ( cnd_broadcast(condition) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
54#define SCIPtnyWaitCondition(condition, lock) ( cnd_wait((condition), (lock)) == thrd_success ? SCIP_OKAY: SCIP_ERROR )
55
56/** struct containing lock */
57struct SCIP_Lock
58{
59 mtx_t lock;
60};
61
62/** struct containing condition */
63struct SCIP_Condition
64{
65 cnd_t condition;
66};
67
68
70static SCIP_THREADPOOL* _threadpool = NULL;
71_Thread_local int _threadnumber; /*lint !e129*/
72
73/** A job added to the queue */
74struct SCIP_Job
75{
76 int jobid; /**< id to identify jobs from a common process */
77 struct SCIP_Job* nextjob; /**< pointer to the next job in the queue */
78 SCIP_RETCODE (*jobfunc)(void* args);/**< pointer to the job function */
79 void* args; /**< pointer to the function arguments */
80 SCIP_RETCODE retcode; /**< return code of the job */
81};
82
83/** the thread pool job queue */
84struct SCIP_JobQueue
85{
86 SCIP_JOB* firstjob; /**< pointer to the first job in the queue */
87 SCIP_JOB* lastjob; /**< pointer to the last job in the queue */
88 int njobs; /**< number of jobs in the queue */
89};
91
92/** The thread pool */
94{
95 /* Pool Characteristics */
96 int nthreads; /**< number of threads in the pool */
97 int queuesize; /**< the total number of items to enter the queue */
98
99 /* Current pool state */
100 thrd_t* threads; /**< the threads included in the pool */
101 SCIP_JOBQUEUE* jobqueue; /**< the job queue */
102 SCIP_JOBQUEUE* currentjobs; /**< the jobs currently being processed on a thread;
103 * only a single job is allowed per thread. */
104 SCIP_JOBQUEUE* finishedjobs; /**< finished jobs that are not yet collected */
105 int currworkingthreads; /**< the threads currently processing jobs */
106 SCIP_Bool blockwhenfull; /**< indicates that the queue can only be as large as nthreads */
107 int currentid; /**< current job id */
108
109 /* Control indicators */
110 SCIP_Bool shutdown; /**< indicates whether the pool needs to be shut down */
111 SCIP_Bool queueopen; /**< indicates whether the queue is open */
112
113 /* mutex and locks for the thread pool */
114 mtx_t poollock; /**< mutex to allow read and write of the pool features */
115 cnd_t queuenotempty; /**< condition to broadcast the queue has jobs */
116 cnd_t queuenotfull; /**< condition to broadcast the queue is not full */
117 cnd_t queueempty; /**< condition to broadcast that the queue is empty */
118 cnd_t jobfinished; /**< condition to broadcast that a job has been finished */
119};
120
121/** this function controls the execution of each of the threads */
122static
124 void* threadnum /**< thread number is passed in as argument stored inside a void pointer */
125 )
126{
127 SCIP_JOB* newjob;
128 SCIP_JOB* prevjob;
129 SCIP_JOB* currjob;
130
131 _threadnumber = (int)(uintptr_t) threadnum;
132
133 /* Increase the number of active threads */
134 SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
135 _threadpool->currworkingthreads += 1;
136 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
137
138 /* this is an endless loop that runs until the thrd_exit function is called */
139 while( TRUE ) /*lint !e716*/
140 {
141 SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
142
143 /* the queue is empty but the shutdown command has not been given */
144 while( _threadpool->jobqueue->njobs == 0 && !_threadpool->shutdown )
145 {
146 SCIP_CALL( SCIPtnyWaitCondition(&(_threadpool->queuenotempty), &(_threadpool->poollock)) );
147 }
148
149 /* if the shutdown command has been given, then exit the thread */
150 if( _threadpool->shutdown )
151 {
152 /* Decrease the thread count when execution of job queue has completed */
153 _threadpool->currworkingthreads -= 1;
154 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
155
156 thrd_exit((int)SCIP_OKAY);
157 }
158
159 /* getting the next job in the queue */
160 newjob = _threadpool->jobqueue->firstjob;
161 _threadpool->jobqueue->njobs--; /* decreasing the number of jobs in the queue */
162
163 if( _threadpool->jobqueue->njobs == 0 )
164 {
165 _threadpool->jobqueue->firstjob = NULL;
166 _threadpool->jobqueue->lastjob = NULL;
167 }
168 else
169 _threadpool->jobqueue->firstjob = newjob->nextjob; /* updating the queue */
170
171 /* if we want to wait when the queue is full, then we broadcast that the queue can now take new jobs */
172 if( _threadpool->blockwhenfull &&
173 _threadpool->jobqueue->njobs == _threadpool->queuesize - 1 )
174 {
176 }
177
178 /* indicating that the queue is empty */
179 if( _threadpool->jobqueue->njobs == 0 )
180 {
182 }
183
184 /* updating the current job list */
185 if( _threadpool->currentjobs->njobs == 0 )
186 {
187 _threadpool->currentjobs->firstjob = newjob;
188 _threadpool->currentjobs->lastjob = newjob;
189 }
190 else
191 {
192 _threadpool->currentjobs->lastjob->nextjob = newjob;
193 _threadpool->currentjobs->lastjob = newjob;
194 }
195
196 _threadpool->currentjobs->njobs++;
197
198 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
199
200 /* setting the job to run on this thread */
201 newjob->retcode = (*(newjob->jobfunc))(newjob->args);
202
203 /* setting the current job on this thread to NULL */
204 SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
205
206 /* finding the location of the processed job in the currentjobs queue */
207 currjob = _threadpool->currentjobs->firstjob;
208 prevjob = NULL;
209
210 while( currjob != newjob )
211 {
212 prevjob = currjob;
213 currjob = prevjob->nextjob;
214 }
215
216 /* removing the processed job from current jobs list */
217 if( currjob == _threadpool->currentjobs->firstjob )
218 _threadpool->currentjobs->firstjob = currjob->nextjob;
219 else
220 prevjob->nextjob = currjob->nextjob; /*lint !e794*/
221
222 if( currjob == _threadpool->currentjobs->lastjob )
223 _threadpool->currentjobs->lastjob = prevjob;
224
225 _threadpool->currentjobs->njobs--;
226
227 /* updating the finished job list */
228 if( _threadpool->finishedjobs->njobs == 0 )
229 {
230 _threadpool->finishedjobs->firstjob = newjob;
231 _threadpool->finishedjobs->lastjob = newjob;
232 }
233 else
234 {
235 _threadpool->finishedjobs->lastjob->nextjob = newjob;
236 _threadpool->finishedjobs->lastjob = newjob;
237 }
238
239 _threadpool->finishedjobs->njobs++;
240
241 /* signalling that a job has been finished */
242 SCIP_CALL( SCIPtnyBroadcastCondition(&(_threadpool)->jobfinished) );
243
244 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
245 }
246}
247
248/** this function controls the execution of each of the threads */
249static
251 void* threadnum /**< thread number is passed in as argument stored inside a void pointer */
252 )
253{
254 return (int) threadPoolThreadRetcode(threadnum);
255}
256
257/** creates a threadpool */
258static
260 SCIP_THREADPOOL** thrdpool, /**< pointer to store threadpool */
261 int nthreads, /**< number of threads in the threadpool */
262 int qsize, /**< maximum size of the jobqueue */
263 SCIP_Bool blockwhenfull /**< should the jobqueue block if it is full */
264 )
265{
266 uintptr_t i;
267
268 assert(nthreads >= 0);
269 assert(qsize >= 0);
270
271 /* @todo think about the correct memory here */
272 SCIP_ALLOC( BMSallocMemory(thrdpool) );
273 (*thrdpool)->currentid = 0;
274 (*thrdpool)->queuesize = qsize;
275 (*thrdpool)->nthreads = nthreads;
276 (*thrdpool)->blockwhenfull = blockwhenfull;
277 (*thrdpool)->shutdown = FALSE;
278 (*thrdpool)->queueopen = TRUE;
279
280 /* allocating memory for the job queue */
281 SCIP_ALLOC( BMSallocMemory(&(*thrdpool)->jobqueue) );
282 (*thrdpool)->jobqueue->firstjob = NULL;
283 (*thrdpool)->jobqueue->lastjob = NULL;
284 (*thrdpool)->jobqueue->njobs = 0;
285
286 /* allocating memory for the job queue */
287 SCIP_ALLOC( BMSallocMemory(&(*thrdpool)->currentjobs) );
288 (*thrdpool)->currentjobs->firstjob = NULL;
289 (*thrdpool)->currentjobs->lastjob = NULL;
290 (*thrdpool)->currentjobs->njobs = 0;
291
292 /* allocating memory for the job queue */
293 SCIP_ALLOC( BMSallocMemory(&(*thrdpool)->finishedjobs) );
294 (*thrdpool)->finishedjobs->firstjob = NULL;
295 (*thrdpool)->finishedjobs->lastjob = NULL;
296 (*thrdpool)->finishedjobs->njobs = 0;
297
298 /* initialising the mutex */
299 SCIP_CALL( SCIPtnyInitLock(&(*thrdpool)->poollock) ); /*lint !e2482*/
300
301 /* initialising the conditions */
302 SCIP_CALL( SCIPtnyInitCondition(&(*thrdpool)->queuenotempty) );
303 SCIP_CALL( SCIPtnyInitCondition(&(*thrdpool)->queuenotfull) );
304 SCIP_CALL( SCIPtnyInitCondition(&(*thrdpool)->queueempty) );
305 SCIP_CALL( SCIPtnyInitCondition(&(*thrdpool)->jobfinished) );
306
307 /* creating the threads */
308 (*thrdpool)->currworkingthreads = 0;
309
310 /* allocating memory for the threads */
311 SCIP_ALLOC( BMSallocMemoryArray(&((*thrdpool)->threads), nthreads) );
312
313 /* create the threads */
314 for( i = 0; i < (unsigned)nthreads; i++ )
315 {
316 if( thrd_create(&((*thrdpool)->threads[i]), threadPoolThread, (void*)i) != thrd_success )
317 return SCIP_ERROR;
318 }
319
320 _threadnumber = nthreads;
321 /* halt while all threads are not active TODO: is synchronization required here ? */
322 /*TODO: this caused a deadlock, is it important to wait for all threads to start?
323 * while( (*thrdpool)->currworkingthreads != nthreads )
324 {}*/
325
326 return SCIP_OKAY;
327}
328
329/** adding a job to the job queue.
330 *
331 * This gives some more flexibility in the handling of new jobs.
332 * This function needs to be called from within a mutex.
333 */
334static
336 SCIP_THREADPOOL* threadpool, /**< pointer to store threadpool */
337 SCIP_JOB* newjob /**< pointer to new job */
338 )
339{
340 /* @todo we want to work out what to do with a full job queue. Is there a problem if the limit is hit? */
341 /* @note it is important to have a queuesize. This will stop the code submitting infinitely many jobs. */
342 assert(threadpool->jobqueue->njobs < threadpool->queuesize);
343
344 newjob->nextjob = NULL;
345
346 /* checking the status of the job queue */
347 if( threadpool->jobqueue->njobs == 0 )
348 {
349 threadpool->jobqueue->firstjob = newjob;
350 threadpool->jobqueue->lastjob = newjob;
351 }
352 else /* it is assumed that the jobqueue is not full */
353 {
354 threadpool->jobqueue->lastjob->nextjob = newjob;
355 threadpool->jobqueue->lastjob = newjob;
356 }
357
358 /* signalling to all threads that the queue has jobs using the signal instead of broadcast because only one thread
359 * should be awakened */
361
362 threadpool->jobqueue->njobs++;
363}
364
365/** adds a job to the threadpool */
366static
368 SCIP_JOB* newjob, /**< job to add to threadpool */
369 SCIP_SUBMITSTATUS* status /**< pointer to store the job's submit status */
370 )
371{
372 assert(newjob != NULL);
373 assert(_threadpool != NULL);
374
375 SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
376
377 /* if the queue is full and we are blocking, then return an error. */
378 if( _threadpool->jobqueue->njobs == _threadpool->queuesize && _threadpool->blockwhenfull )
379 {
380 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
381 *status = SCIP_SUBMIT_QUEUEFULL;
382 return SCIP_OKAY;
383 }
384
385 /* Wait until the job queue is not full. If the queue is closed or the thread pool is shut down, then stop waiting. */
386 /* @todo this needs to be checked. It is possible that a job can be submitted and then the queue is closed or the
387 * thread pool is shut down. Need to work out the best way to handle this. */
388 while( _threadpool->jobqueue->njobs == _threadpool->queuesize && !(_threadpool->shutdown || !_threadpool->queueopen) )
389 {
390 SCIP_CALL( SCIPtnyWaitCondition(&(_threadpool->queuenotfull), &(_threadpool->poollock)) );
391 }
392
393 /* if the thread pool is shut down or the queue is closed, then we need to leave the job submission */
394 if( !_threadpool->queueopen )
395 {
396 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
397 *status = SCIP_SUBMIT_QUEUECLOSED;
398 return SCIP_OKAY;
399 }
400 else if( _threadpool->shutdown )
401 {
402 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
403 *status = SCIP_SUBMIT_SHUTDOWN;
404 return SCIP_OKAY;
405 }
406
407 /* creating the job for submission */
408 newjob->nextjob = NULL;
409
410 /* adding the job to the queue */
411 /* this can only happen if the queue is not full */
412 assert(_threadpool->jobqueue->njobs != _threadpool->queuesize);
413 jobQueueAddJob(_threadpool, newjob);
414
415 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
416
417 *status = SCIP_SUBMIT_SUCCESS;
418
419 return SCIP_OKAY;
420}
421
422/** frees the jobqueue of the threadpool */
423static
425 SCIP_THREADPOOL* thrdpool /**< pointer to thread pool */
426 )
427{
428 SCIP_JOB* currjob;
429
430 assert(!thrdpool->queueopen);
431 assert(thrdpool->shutdown);
432
433 /* iterating through all jobs until all have been freed */
434 while( thrdpool->jobqueue->firstjob != NULL )
435 {
436 currjob = thrdpool->jobqueue->firstjob->nextjob;
437 thrdpool->jobqueue->firstjob = thrdpool->jobqueue->firstjob->nextjob;
438 BMSfreeMemory(&currjob);
439 }
440
441 assert(thrdpool->jobqueue->firstjob == NULL);
442 assert(thrdpool->jobqueue->lastjob == NULL);
443
444 BMSfreeMemory(&thrdpool->jobqueue);
445}
446
447/** free the thread pool */
448static
450 SCIP_THREADPOOL** thrdpool, /**< pointer to thread pool */
451 SCIP_Bool finishjobs, /**< currently unused */
452 SCIP_Bool completequeue /**< Wait until the queue has complete? */
453 )
454{
455 int i;
456 SCIP_RETCODE retcode;
457
458 /*TODO remove argument? */
459 SCIP_UNUSED( finishjobs );
460
461 SCIP_CALL( SCIPtnyAcquireLock(&((*thrdpool)->poollock)) );
462
463 /* if the shutdown is already in progress, then we don't need to complete this function */
464 if( !(*thrdpool)->queueopen || (*thrdpool)->shutdown )
465 {
466 SCIP_CALL( SCIPtnyReleaseLock(&((*thrdpool)->poollock)) );
467
468 return SCIP_OKAY;
469 }
470
471 /* indicating that the job queue is now closed for new jobs */
472 (*thrdpool)->queueopen = FALSE;
473
474 /* if the jobs in the queue should be completed, then we wait until the queueempty condition is set */
475 if( completequeue )
476 {
477 while( (*thrdpool)->jobqueue->njobs > 0 )
478 {
479 SCIP_CALL( SCIPtnyWaitCondition(&((*thrdpool)->queueempty), &((*thrdpool)->poollock)) );
480 }
481 }
482
483 /* indicating that the tpi has commenced the shutdown process */
484 (*thrdpool)->shutdown = TRUE;
485
486 SCIP_CALL( SCIPtnyReleaseLock(&((*thrdpool)->poollock)) );
487
488 /* waking up all threads so that they can check the shutdown condition;
489 * this requires that the conditions queuenotempty and queuenotfull is broadcast
490 */
491 SCIP_CALL( SCIPtnyBroadcastCondition(&((*thrdpool)->queuenotempty)) );
492 SCIP_CALL( SCIPtnyBroadcastCondition(&((*thrdpool)->queuenotfull)) );
493
494 retcode = SCIP_OKAY;
495
496 /* calling a join to ensure that all worker finish before the thread pool is closed */
497 for( i = 0; i < (*thrdpool)->nthreads; i++ )
498 {
499 int thrdretcode;
500
501 if( thrd_join((*thrdpool)->threads[i], &thrdretcode) != thrd_success )
502 retcode = (SCIP_RETCODE) MIN((int)SCIP_ERROR, (int)retcode);
503 else
504 retcode = (SCIP_RETCODE) MIN(thrdretcode, (int)retcode);
505 }
506
507 /* freeing memory and data structures */
508 BMSfreeMemoryArray(&(*thrdpool)->threads);
509
510 /* Freeing the current jobs list. This assumes that all jobs complete before the tpi is closed. */
511 assert((*thrdpool)->currentjobs->njobs == 0);
512 BMSfreeMemory(&(*thrdpool)->currentjobs);
513 assert((*thrdpool)->finishedjobs->njobs == 0);
514 BMSfreeMemory(&(*thrdpool)->finishedjobs);
515
516 freeJobQueue(*thrdpool);
517
518 /* destroying the conditions */
519 SCIPtnyDestroyCondition(&(*thrdpool)->jobfinished);
520 SCIPtnyDestroyCondition(&(*thrdpool)->queueempty);
521 SCIPtnyDestroyCondition(&(*thrdpool)->queuenotfull);
522 SCIPtnyDestroyCondition(&(*thrdpool)->queuenotempty);
523
524 /* destroying the mutex */
525 SCIPtnyDestroyLock(&(*thrdpool)->poollock);
526
527 BMSfreeMemory(thrdpool);
528
529 return retcode;
530}
531
532
533/* checking a job queue */
534static
536 SCIP_JOBQUEUE* jobqueue, /**< pointer to the job queue */
537 int jobid /**< id of job to check */
538 )
539{
540 SCIP_JOB* currjob = jobqueue->firstjob;
541
542 /* checking the job ids */
543 if( currjob != NULL )
544 {
545 while( currjob != jobqueue->lastjob )
546 {
547 if( currjob->jobid == jobid )
548 return SCIP_JOB_INQUEUE;
549
550 currjob = currjob->nextjob;
551 }
552
553 if( currjob->jobid == jobid )
554 return SCIP_JOB_INQUEUE;
555 }
556
558}
559
560/** returns whether the job id is running */
561static
563 SCIP_JOBQUEUE* currentjobs, /**< queue of current jobs */
564 int jobid /**< id of job to check */
565 )
566{
567 if( checkJobQueue(currentjobs, jobid) == SCIP_JOB_INQUEUE )
568 return TRUE;
569 else
570 return FALSE;
571}
572
573/** returns the number of threads */
575 void
576 )
577{
578 return _threadpool != NULL ? _threadpool->nthreads : 0;
579}
580
581/** initializes tpi */
583 int nthreads, /**< the number of threads to be used */
584 int queuesize, /**< the size of the queue */
585 SCIP_Bool blockwhenfull /**< should the queue block when full */
586 )
587{
588 assert(_threadpool == NULL);
589 SCIP_CALL( createThreadPool(&_threadpool, nthreads, queuesize, blockwhenfull) );
590 return SCIP_OKAY;
591}
592
593/** deinitializes tpi */
595 void
596 )
597{
598 assert(_threadpool != NULL);
599
600 SCIP_CALL( freeThreadPool(&_threadpool, TRUE, TRUE) );
601
602 return SCIP_OKAY;
603}
604
605/** creates a job for parallel processing */
607 SCIP_JOB** job, /**< pointer to the job that will be created */
608 int jobid, /**< the id for the current job */
609 SCIP_RETCODE (*jobfunc)(void* args),/**< pointer to the job function */
610 void* jobarg /**< the job's argument */
611 )
612{
614
615 (*job)->jobid = jobid;
616 (*job)->jobfunc = jobfunc;
617 (*job)->args = jobarg;
618 (*job)->nextjob = NULL;
619
620 return SCIP_OKAY;
621}
622
623/** get a new job id for the new set of submitted jobs */
625 void
626 )
627{
628 int id;
629 assert(_threadpool != NULL);
630
632 id = ++_threadpool->currentid;
634
635 return id;
636}
637
638/** submit a job for parallel processing; the return value is a globally defined status */
640 SCIP_JOB* job, /**< pointer to the job to be submitted */
641 SCIP_SUBMITSTATUS* status /**< pointer to store the job's submit status */
642 )
643{
644 assert(job != NULL);
645
646 /* the job id must be set before submitting the job. The submitter controls whether a new id is required. */
647 assert(job->jobid == _threadpool->currentid);
648 SCIP_CALL( threadPoolAddWork(job, status) );
649
650 return SCIP_OKAY;
651}
652
653/** blocks until all jobs of the given jobid have finished
654 * and then returns the smallest SCIP_RETCODE of all the jobs
655 */
657 int jobid /**< the jobid of the jobs to wait for */
658 )
659{
660 SCIP_RETCODE retcode;
661 SCIP_JOB* currjob;
662 SCIP_JOB* prevjob;
663
664 SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
665
666 while( isJobRunning(_threadpool->currentjobs, jobid) || isJobRunning(_threadpool->jobqueue, jobid) )
667 {
668 SCIP_CALL( SCIPtnyWaitCondition(&_threadpool->jobfinished, &_threadpool->poollock) );
669 }
670
671 /* finding the location of the processed job in the currentjobs queue */
672 retcode = SCIP_OKAY;
673 currjob = _threadpool->finishedjobs->firstjob;
674 prevjob = NULL;
675
676 while( currjob )
677 {
678 if( currjob->jobid == jobid )
679 {
680 SCIP_JOB* nextjob;
681
682 /* if the job has the right jobid collect its retcode,
683 * remove it from the finished job list, and free it
684 */
685 retcode = MIN(retcode, currjob->retcode);
686
687 /* removing the finished job from finished jobs list */
688 if( currjob == _threadpool->finishedjobs->firstjob )
689 {
690 _threadpool->finishedjobs->firstjob = currjob->nextjob;
691 }
692 else
693 {
694 assert(prevjob != NULL);
695 prevjob->nextjob = currjob->nextjob;
696 }
697
698 if( currjob == _threadpool->finishedjobs->lastjob )
699 _threadpool->finishedjobs->lastjob = prevjob;
700
701 _threadpool->finishedjobs->njobs--;
702
703 /* update currjob and free finished job; prevjob stays the same */
704 nextjob = currjob->nextjob;
705 BMSfreeMemory(&currjob);
706 currjob = nextjob;
707 }
708 else
709 {
710 /* otherwise leave job untouched */
711 prevjob = currjob;
712 currjob = prevjob->nextjob;
713 }
714 }
715
716 SCIP_CALL( SCIPtnyReleaseLock(&_threadpool->poollock) );
717
718 return retcode;
719}
720
721
722/*
723 * locks
724 */
725
726/** initializes the given lock */
728 SCIP_LOCK** lock /**< the lock */
729 )
730{
731 assert(lock != NULL);
732
733 SCIP_ALLOC( BMSallocMemory(lock) );
734
735 if( mtx_init(&(*lock)->lock, mtx_plain) == thrd_success )
736 return SCIP_OKAY;
737 else
738 {
739 BMSfreeMemory(lock);
740 return SCIP_ERROR;
741 }
742}
743
744/** destroys the given lock */
746 SCIP_LOCK** lock /**< the lock */
747 )
748{
749 assert(lock != NULL);
750
751 mtx_destroy(&(*lock)->lock);
752 BMSfreeMemory(lock);
753}
754
755/** acquires the given lock */
757 SCIP_LOCK* lock /**< the lock */
758 )
759{
760 if( mtx_lock(&lock->lock) == thrd_success )
761 return SCIP_OKAY;
762 return SCIP_ERROR;
763}
764
765/** releases the given lock */
767 SCIP_LOCK* lock /**< the lock */
768 )
769{
770 if( mtx_unlock(&lock->lock) == thrd_success )
771 return SCIP_OKAY;
772 return SCIP_ERROR;
773}
774
775
776/*
777 * conditions
778 */
779
780/** initializes the given condition variable */
782 SCIP_CONDITION** condition /**< condition to be created and initialized */
783 )
784{
785 assert(condition != NULL);
786
787 SCIP_ALLOC( BMSallocMemory(condition) );
788
789 if( cnd_init(&(*condition)->condition) == thrd_success )
790 return SCIP_OKAY;
791 return SCIP_ERROR;
792}
793
794/** destroys the given condition variable */
796 SCIP_CONDITION** condition /**< condition to be destroyed and freed */
797 )
798{
799 cnd_destroy(&(*condition)->condition);
800 BMSfreeMemory(condition);
801}
802
803/** signals one waiting thread */
805 SCIP_CONDITION* condition /**< the condition variable to signal */
806 )
807{
808 if( cnd_signal(&condition->condition) == thrd_success )
809 return SCIP_OKAY;
810 return SCIP_ERROR;
811}
812
813/** signals all waiting threads */
814SCIP_EXPORT
816 SCIP_CONDITION* condition /**< the condition variable to broadcast */
817 )
818{
819 if( cnd_broadcast(&condition->condition) == thrd_success )
820 return SCIP_OKAY;
821 return SCIP_ERROR;
822}
823
824/** waits on a condition variable. The given lock must be held by the caller and will
825 * be held when this function returns.
826 */
828 SCIP_CONDITION* condition, /**< the condition variable to wait on */
829 SCIP_LOCK* lock /**< the lock that is held by the caller */
830 )
831{
832 if( cnd_wait(&condition->condition, &lock->lock) == thrd_success )
833 return SCIP_OKAY;
834 return SCIP_ERROR;
835}
836
837/** returns the thread number */
839 void
840 )
841{
842 return _threadnumber;
843}
844
845/** indicate whether a working TPI is available */
847{
848 return TRUE;
849}
850
851/** get name of library that the TPI interfaces to */
853 char* name, /**< buffer to store name */
854 int namesize /**< length of name buffer */
855 )
856{
857 assert(name != NULL);
858
859 (void) SCIPsnprintf(name, namesize, "TinyCThread %d.%d", TINYCTHREAD_VERSION_MAJOR, TINYCTHREAD_VERSION_MINOR);
860}
861
862/** get description of library that the TPI interfaces to */
864 char* desc, /**< buffer to store description */
865 int descsize /**< length of description */
866 )
867{
868 assert(desc != NULL);
869
870 (void) SCIPsnprintf(desc, descsize, "small portable implementation of the C11 threads API (tinycthread.github.io)");
871}
#define NULL
Definition: def.h:266
#define SCIP_UNUSED(x)
Definition: def.h:427
#define SCIP_Bool
Definition: def.h:91
#define MIN(x, y)
Definition: def.h:242
#define SCIP_ALLOC(x)
Definition: def.h:384
#define TRUE
Definition: def.h:93
#define FALSE
Definition: def.h:94
#define SCIP_CALL_ABORT(x)
Definition: def.h:352
#define SCIP_CALL(x)
Definition: def.h:373
int SCIPsnprintf(char *t, int len, const char *s,...)
Definition: misc.c:10880
memory allocation routines
#define BMSfreeMemory(ptr)
Definition: memory.h:145
#define BMSallocMemoryArray(ptr, num)
Definition: memory.h:123
#define BMSfreeMemoryArray(ptr)
Definition: memory.h:147
#define BMSallocMemory(ptr)
Definition: memory.h:118
public methods for message output
public data structures and miscellaneous methods
SCIP_JOB * lastjob
Definition: tpi_openmp.c:85
SCIP_JOB * firstjob
Definition: tpi_openmp.c:84
SCIP_RETCODE retcode
Definition: tpi_openmp.c:78
struct SCIP_Job * nextjob
Definition: tpi_openmp.c:75
void * args
Definition: tpi_openmp.c:77
SCIP_RETCODE(* jobfunc)(void *args)
Definition: tpi_openmp.c:76
int jobid
Definition: tpi_openmp.c:74
mtx_t lock
Definition: tpi_tnycthrd.c:59
omp_lock_t lock
Definition: tpi_openmp.c:58
SCIP_Bool queueopen
Definition: tpi_tnycthrd.c:111
SCIP_JOBQUEUE * jobqueue
Definition: tpi_tnycthrd.c:101
SCIP_Bool shutdown
Definition: tpi_tnycthrd.c:110
SCIP_Bool blockwhenfull
Definition: tpi_tnycthrd.c:106
SCIP_JOBQUEUE * currentjobs
Definition: tpi_tnycthrd.c:102
thrd_t * threads
Definition: tpi_tnycthrd.c:100
SCIP_JOBQUEUE * finishedjobs
Definition: tpi_tnycthrd.c:104
the type definitions for the SCIP parallel interface
static SCIP_RETCODE threadPoolAddWork(SCIP_JOB *newjob, SCIP_SUBMITSTATUS *status)
Definition: tpi_tnycthrd.c:367
static SCIP_JOBSTATUS checkJobQueue(SCIP_JOBQUEUE *jobqueue, int jobid)
Definition: tpi_tnycthrd.c:535
SCIP_Bool SCIPtpiIsAvailable(void)
Definition: tpi_tnycthrd.c:846
SCIP_RETCODE SCIPtpiWaitCondition(SCIP_CONDITION *condition, SCIP_LOCK *lock)
Definition: tpi_tnycthrd.c:827
SCIP_RETCODE SCIPtpiCreateJob(SCIP_JOB **job, int jobid, SCIP_RETCODE(*jobfunc)(void *args), void *jobarg)
Definition: tpi_tnycthrd.c:606
static SCIP_RETCODE threadPoolThreadRetcode(void *threadnum)
Definition: tpi_tnycthrd.c:123
SCIP_RETCODE SCIPtpiSignalCondition(SCIP_CONDITION *condition)
Definition: tpi_tnycthrd.c:804
SCIP_RETCODE SCIPtpiAcquireLock(SCIP_LOCK *lock)
Definition: tpi_tnycthrd.c:756
#define SCIPtnyInitCondition(condition)
Definition: tpi_tnycthrd.c:50
static SCIP_RETCODE createThreadPool(SCIP_THREADPOOL **thrdpool, int nthreads, int qsize, SCIP_Bool blockwhenfull)
Definition: tpi_tnycthrd.c:259
static void jobQueueAddJob(SCIP_THREADPOOL *threadpool, SCIP_JOB *newjob)
Definition: tpi_tnycthrd.c:335
SCIP_RETCODE SCIPtpiExit(void)
Definition: tpi_tnycthrd.c:594
#define SCIPtnyInitLock(lock)
Definition: tpi_tnycthrd.c:44
SCIP_RETCODE SCIPtpiBroadcastCondition(SCIP_CONDITION *condition)
Definition: tpi_tnycthrd.c:815
#define SCIPtnyBroadcastCondition(condition)
Definition: tpi_tnycthrd.c:53
SCIP_RETCODE SCIPtpiSubmitJob(SCIP_JOB *job, SCIP_SUBMITSTATUS *status)
Definition: tpi_tnycthrd.c:639
static SCIP_Bool isJobRunning(SCIP_JOBQUEUE *currentjobs, int jobid)
Definition: tpi_tnycthrd.c:562
void SCIPtpiDestroyLock(SCIP_LOCK **lock)
Definition: tpi_tnycthrd.c:745
void SCIPtpiGetLibraryDesc(char *desc, int descsize)
Definition: tpi_tnycthrd.c:863
SCIP_RETCODE SCIPtpiCollectJobs(int jobid)
Definition: tpi_tnycthrd.c:656
#define SCIPtnyWaitCondition(condition, lock)
Definition: tpi_tnycthrd.c:54
#define SCIPtnyReleaseLock(lock)
Definition: tpi_tnycthrd.c:47
#define SCIPtnyDestroyCondition(condition)
Definition: tpi_tnycthrd.c:51
#define SCIPtnySignalCondition(condition)
Definition: tpi_tnycthrd.c:52
int SCIPtpiGetThreadNum(void)
Definition: tpi_tnycthrd.c:838
int SCIPtpiGetNumThreads(void)
Definition: tpi_tnycthrd.c:574
void SCIPtpiDestroyCondition(SCIP_CONDITION **condition)
Definition: tpi_tnycthrd.c:795
int SCIPtpiGetNewJobID(void)
Definition: tpi_tnycthrd.c:624
#define SCIPtnyDestroyLock(lock)
Definition: tpi_tnycthrd.c:45
static int threadPoolThread(void *threadnum)
Definition: tpi_tnycthrd.c:250
void SCIPtpiGetLibraryName(char *name, int namesize)
Definition: tpi_tnycthrd.c:852
SCIP_RETCODE SCIPtpiInitLock(SCIP_LOCK **lock)
Definition: tpi_tnycthrd.c:727
static SCIP_RETCODE freeThreadPool(SCIP_THREADPOOL **thrdpool, SCIP_Bool finishjobs, SCIP_Bool completequeue)
Definition: tpi_tnycthrd.c:449
static void freeJobQueue(SCIP_THREADPOOL *thrdpool)
Definition: tpi_tnycthrd.c:424
SCIP_RETCODE SCIPtpiReleaseLock(SCIP_LOCK *lock)
Definition: tpi_tnycthrd.c:766
SCIP_RETCODE SCIPtpiInitCondition(SCIP_CONDITION **condition)
Definition: tpi_tnycthrd.c:781
#define SCIPtnyAcquireLock(lock)
Definition: tpi_tnycthrd.c:46
SCIP_RETCODE SCIPtpiInit(int nthreads, int queuesize, SCIP_Bool blockwhenfull)
Definition: tpi_tnycthrd.c:582
@ SCIP_OKAY
Definition: type_retcode.h:42
@ SCIP_ERROR
Definition: type_retcode.h:43
enum SCIP_Retcode SCIP_RETCODE
Definition: type_retcode.h:63
enum SCIP_Submitstatus SCIP_SUBMITSTATUS
Definition: type_tpi.h:50
@ SCIP_JOB_DOESNOTEXIST
Definition: type_tpi.h:60
@ SCIP_JOB_INQUEUE
Definition: type_tpi.h:61
enum SCIP_Jobstatus SCIP_JOBSTATUS
Definition: type_tpi.h:65
@ SCIP_SUBMIT_SUCCESS
Definition: type_tpi.h:48
@ SCIP_SUBMIT_SHUTDOWN
Definition: type_tpi.h:47
@ SCIP_SUBMIT_QUEUEFULL
Definition: type_tpi.h:45
@ SCIP_SUBMIT_QUEUECLOSED
Definition: type_tpi.h:46