MHEG5  18.9.0
MHEG5 Documentation
glue_queue.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright © 2014 The DTVKit Open Software Foundation Ltd (www.dtvkit.org)
3  * Copyright © 2011 Ocean Blue Software Ltd
4  *
5  * This file is part of a DTVKit Software Component
6  * You are permitted to copy, modify or distribute this file subject to the terms
7  * of the DTVKit 1.0 Licence which can be found in licence.txt or at www.dtvkit.org
8  *
9  * THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND,
10  * EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES
11  * OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.
12  *
13  * If you or your organisation is not a member of DTVKit then you have access
14  * to this source code outside of the terms of the licence agreement
15  * and you are expected to delete this and any associated files immediately.
16  * Further information on DTVKit, membership and terms can be found at www.dtvkit.org
17  *******************************************************************************/
25 /*---includes for this file--------------------------------------------------*/
26 #include "techtype.h"
27 #include "stb_os.h"
28 #include "glue_memory.h"
29 #include "glue_debug.h"
30 #include "glue_queue.h"
31 
32 #define UP_MSGS_WHEN_Q_FULL
33 #ifndef NDEBUG
34 //#define MONITOR_STATS
35 #endif
36 #ifdef MONITOR_STATS
37 #define Q_STATS(s) s
38 #else
39 #define Q_STATS(s)
40 #endif
41 #define CRITICAL_Q_SIZE (1 << 3)
42 #define HIGH_Q_SIZE (1 << 3)
43 #define NORMAL_Q_SIZE (1 << 8)
44 #define LOW_Q_SIZE (1 << 4)
45 
46 /* High priority section buffer exclusion space as percentage *
47  * of total section queue size */
48 #define EXCLUSION_RATIO (35)
49 
50 typedef struct s_msg_queue
51 {
54 } S_MSG_QUEUE;
55 
56 typedef struct s_q_notify
57 {
58  struct s_q_notify *next;
62 } S_Q_NOTIFY;
63 
64 static void *vq_read_sema = 0;
65 static void *queue_mutex = 0;
66 
67 static U16BIT queue_size[PRTY_INVALID] =
69 
70 static S_MSG_QUEUE *msg_queue[PRTY_INVALID];
71 static E_PRIORITY urgent_priority = PRTY_CRITICAL;
72 static S_Q_NOTIFY *vq_notify_head = NULL;
73 #ifdef TRACING
74 static int show_yeild_failure = 0;
75 #endif
76 #ifdef MONITOR_STATS
77 static int put_sema_total = 0;
78 static int get_sema_total = 0;
79 static int put_msg_total[PRTY_INVALID] = {0, 0, 0, 0, 0};
80 static int get_msg_total[PRTY_INVALID] = {0, 0, 0, 0, 0};
81 #if defined(STACK_DEBUGGING)
82 void STB_OSTaskPrint(void);
83 #endif
84 #endif
85 
97 {
98  int i;
99  /* Check function parameters */
100  if (queue_mutex != 0)
101  {
103  }
104 
105  /* Create queue mutex */
106  queue_mutex = STB_OSCreateMutex();
107  if (queue_mutex == NULL)
108  {
109  TRACE(TERROR, ("failed to create mutex"));
110  return MHERR_INTERNAL;
111  }
112  vq_read_sema = STB_OSCreateCountSemaphore(0);
113  if (vq_read_sema == NULL)
114  {
115  TRACE(TERROR, ("failed to create semaphore"));
116  return MHERR_INTERNAL;
117  }
118  /* Calculate num of high priority section buffers */
119  for (i = 0; i != PRTY_INVALID; i++)
120  {
121  msg_queue[i] = SYS_Alloc( sizeof(S_MSG_QUEUE) + (queue_size[i] * sizeof(S_MhegMessage)));
122  if (msg_queue[i] == NULL)
123  {
124  STB_OSDeleteMutex(queue_mutex);
125  queue_mutex = 0;
127  }
128  msg_queue[i]->read_q_index = 0;
129  msg_queue[i]->write_q_index = 0;
130  }
131  return MHERR_OK;
132 }
133 
140 void VQ_Close(void)
141 {
142  void *q_mtx;
143  int i, nx;
144  S_MhegMessage *p_elem;
145  S_MSG_QUEUE *p_queue;
146  FUNCTION_START(x)
147  /* save queue_mutex and set to zero so that any other
148  * task will already detect that queue is closing/closed */
149  q_mtx = queue_mutex;
150  queue_mutex = 0;
151  if (q_mtx != 0)
152  {
153  /* now can close and release resources */
154  i = PRTY_INVALID;
155 
156  STB_OSMutexLock(q_mtx);
157  do
158  {
159  i--;
160  /* Need to free data in queue items */
161  p_queue = msg_queue[i];
162  while (p_queue->read_q_index != p_queue->write_q_index)
163  {
164  nx = p_queue->read_q_index;
165  p_elem = (S_MhegMessage *)(p_queue + 1);
166  p_elem += nx;
167  switch (p_elem->data_type)
168  {
169  default:
170  case DT_NONE: case DT_VALUE:
171  /*nothing to do*/
172  break;
173 
174  case DT_ALLOC:
175  MHEG5freeMem((void *)p_elem->data.content.data );
176  break;
177 
178  case DT_CONTENT:
179  if (p_elem->data.content.destroy)
180  {
181  p_elem->data.content.destroy( p_elem->data.content.fs_handle );
182  }
183  break;
184 
185  case DT_D_CONTENT:
186  if (p_elem->data.content.destroy)
187  {
188  p_elem->data.content.destroy( &p_elem->data.content );
189  }
190  break;
191 
192  case DT_F_CONTENT:
193  p_elem->proc_msg_func( &(p_elem->data.content));
194  break;
195 
196  case DT_S_CONTENT:
197  STR_DataFree( p_elem->data.content.data, p_elem->data.content.size );
198  break;
199  }
200  nx++;
201  if (nx == queue_size[i])
202  {
203  nx = 0;
204  }
205  p_queue->read_q_index = nx;
206  }
207  SYS_Free( p_queue );
208  }
209  while (i != 0);
210  STB_OSMutexUnlock(q_mtx);
211  STB_OSDeleteMutex(q_mtx);
212  STB_OSDeleteSemaphore(vq_read_sema);
213  vq_read_sema = 0;
214  }
215  FUNCTION_FINISH(x)
216 }
217 
218 static U16BIT SizeFree( E_PRIORITY priority )
219 {
220  U16BIT count;
221  FUNCTION_START(x)
222  assert( priority < PRTY_INVALID );
223  //TRACE(TQUEUE,("%d c=%d,sz=%d,r=%d,w=%d",priority,count,queue_size[priority],
224  // msg_queue[priority]->read_q_index,msg_queue[priority]->write_q_index))
225  if (msg_queue[priority]->read_q_index > msg_queue[priority]->write_q_index)
226  {
227  count = msg_queue[priority]->read_q_index - msg_queue[priority]->write_q_index;
228  }
229  else
230  {
231  count = queue_size[priority] + msg_queue[priority]->read_q_index - msg_queue[priority]->write_q_index;
232  }
233  FUNCTION_FINISH(x)
234  return count;
235 }
236 
249 {
250  E_MhegErr result = MHERR_OK;
251  S_MhegMessage *p_elmnt;
252  S_MSG_QUEUE *p_queue;
253  S_Q_NOTIFY *vqn;
254  U16BIT new_ndx;
255 
256  FUNCTION_START(x)
257  assert( priority < PRTY_INVALID );
258  if (queue_mutex == 0)
259  {
260  TRACE(TERROR, ("Queue not open"));
261  result = MHERR_COMP_NOT_OPEN;
262  }
263  else
264  {
265  p_queue = msg_queue[priority];
266  p_elmnt = (S_MhegMessage *)(p_queue + 1);
267 
268  STB_OSMutexLock(queue_mutex);
269 
270  new_ndx = p_queue->write_q_index;
271  p_elmnt += new_ndx;
272  new_ndx++;
273  if (new_ndx == queue_size[priority])
274  {
275  new_ndx = 0;
276  }
277  if (new_ndx == p_queue->read_q_index)
278  {
279  #ifdef MONITOR_STATS
280  int i, p = 0, g = 0;
281  for (i = 0; i != PRTY_INVALID; i++)
282  {
283  p += put_msg_total[i];
284  g += get_msg_total[i];
285  }
286  TRACE(TQUEUE, ("QQQ FULL sema=(%d,%d) qp=(%d,%d) all=(%d,%d) QQQ FULL",
287  put_sema_total, get_sema_total, put_msg_total[priority], get_msg_total[priority], p, g))
288  #else
289  TRACE(TWARN | TQUEUE, ("VGR QUEUE(%d) is FULL", priority))
290  #endif
291  #ifdef TRACING
292  show_yeild_failure = 1;
293  #endif
294  result = MHERR_QUEUE_FULL;
295  }
296  else
297  {
298  if (priority == PRTY_CRITICAL)
299  {
300  vqn = vq_notify_head;
301  while (vqn != NULL)
302  {
303  vqn->critical_rcvd_func();
304  vqn = vqn->next;
305  }
306  }
307  else if (priority != PRTY_LOW)
308  {
309  if (SizeFree(priority) < (queue_size[priority] >> 2))
310  {
311  urgent_priority = priority;
312  }
313  vqn = vq_notify_head;
314  while (vqn != NULL)
315  {
316  vqn->normal_rcvd_func();
317  vqn = vqn->next;
318  }
319  }
320  memcpy( p_elmnt, pMsg, sizeof(S_MhegMessage));
321  p_queue->write_q_index = new_ndx;
322  Q_STATS(put_msg_total[priority]++; )
323  Q_STATS(put_sema_total++; )
324  STB_OSSemaphoreSignal(vq_read_sema);
325  }
326  STB_OSMutexUnlock(queue_mutex);
327  }
328  FUNCTION_FINISH(x)
329  return result;
330 }
331 
342 {
343  E_MhegErr result = MHERR_INTERNAL;
344  int i;
345  S_MhegMessage *p_elem;
346  S_MSG_QUEUE *p_queue;
347  S_Q_NOTIFY *vqn;
348  U16BIT nxt_ndx;
349 
350 #ifdef MONITOR_STATS
351  U32BIT starting;
352  if (put_sema_total == get_sema_total)
353  {
354  starting = STB_OSGetClockMilliseconds();
355  }
356  else
357  {
358  starting = 0;
359  }
360 #endif
361  FUNCTION_START(x)
362  STB_OSSemaphoreWait(vq_read_sema);
363  Q_STATS(get_sema_total++; )
364 #ifdef MONITOR_STATS
365  if (starting != 0)
366  {
367  TRACE(TQUEUE, ("QQQ TTT sema=(%d,%d) time=%d TTT QQQ", put_sema_total, get_sema_total, STB_OSGetClockMilliseconds() - starting))
368  }
369 #endif
370 
371  STB_OSMutexLock(queue_mutex);
372  if (urgent_priority != PRTY_CRITICAL &&
373  msg_queue[PRTY_CRITICAL]->read_q_index != msg_queue[PRTY_CRITICAL]->write_q_index)
374  {
375  i = PRTY_CRITICAL;
376  }
377  else
378  {
379  i = urgent_priority;
380  }
381  for (; i != PRTY_INVALID; i++)
382  {
383  p_queue = msg_queue[i];
384  if (p_queue->read_q_index != p_queue->write_q_index)
385  {
386  nxt_ndx = p_queue->read_q_index;
387  p_elem = (S_MhegMessage *)(p_queue + 1);
388  p_elem += nxt_ndx;
389  nxt_ndx++;
390  if (nxt_ndx == queue_size[i])
391  {
392  nxt_ndx = 0;
393  }
394  p_queue->read_q_index = nxt_ndx;
395  memcpy( pMsg, p_elem, sizeof(S_MhegMessage));
396  if (i == 0)
397  {
398  vqn = vq_notify_head;
399  while (vqn != NULL)
400  {
401  vqn->critical_done_func();
402  vqn = vqn->next;
403  }
404  }
405  result = MHERR_OK;
406  Q_STATS(get_msg_total[i]++; )
407  break;
408  }
409  }
410  if (urgent_priority != PRTY_CRITICAL &&
411  SizeFree(urgent_priority) > ((queue_size[urgent_priority] * 3) >> 2))
412  {
413  urgent_priority = PRTY_CRITICAL;
414  }
415 #ifdef MONITOR_STATS
416  {
417  int p = 0, g = 0;
418  for (i = 0; i != PRTY_INVALID; i++)
419  {
420  p += put_msg_total[i];
421  g += get_msg_total[i];
422  }
423  if (put_sema_total - get_sema_total != p - g)
424  {
425  TRACE(TQUEUE, ("QQQ sema=(%d,%d) all=(%d,%d) QQQ", put_sema_total, get_sema_total, p, g))
426  }
427  }
428 #endif
429  STB_OSMutexUnlock(queue_mutex);
430  FUNCTION_FINISH(x)
431  return result;
432 }
433 
435 {
436  U16BIT count;
437  FUNCTION_START(x)
438  assert( priority < PRTY_INVALID );
439  if (queue_mutex == 0)
440  {
441  count = 0;
442  }
443  else
444  {
445  STB_OSMutexLock(queue_mutex);
446  count = SizeFree(priority);
447  STB_OSMutexUnlock(queue_mutex);
448  }
449  FUNCTION_FINISH(x)
450  return count;
451 }
452 
458 {
459  U16BIT priority;
460  BOOLEAN needed = FALSE;
461  FUNCTION_START(x)
462  if (queue_mutex != NULL)
463  {
464  STB_OSMutexLock(queue_mutex);
465  if (msg_queue[PRTY_CRITICAL]->read_q_index != msg_queue[PRTY_CRITICAL]->write_q_index)
466  {
467  /* have critical message - keypress stop/start or something */
468  TRACE(TQUEUE, ("Critical MESSAGE"))
469  needed = TRUE;
470  }
471  else
472  {
473  for (priority = PRTY_NORMAL; priority != PRTY_INVALID; priority++)
474  {
475  /* If space available on queue is less than 1/4 of total size,
476  * then need to process q's */
477  if (SizeFree(priority) < (queue_size[priority] >> 2))
478  {
479  urgent_priority = priority;
480  TRACE(TQUEUE, ("Priority %d low on space", priority))
481  needed = TRUE;
482  break;
483  }
484  }
485  }
486  STB_OSMutexUnlock(queue_mutex);
487  }
488 #ifdef TRACING
489  if (!needed && show_yeild_failure)
490  {
491  for (priority = PRTY_NORMAL; priority != PRTY_INVALID; priority++)
492  {
493  TRACE(TQUEUE, ("p=%d, fsz=%d qsz=%d", priority, SizeFree(priority), queue_size[priority] >> 2));
494  }
495  show_yeild_failure = 0;
496  }
497 #endif
498  FUNCTION_FINISH(x)
499  return needed;
500 }
501 
507 void* VQ_RegisterNotify( F_QueueNotify normal_rcvd, F_QueueNotify critical_rcvd,
508  F_QueueNotify critical_done )
509 
510 {
511  S_Q_NOTIFY *vqn;
512  FUNCTION_START(x)
513  vqn = SYS_Alloc( sizeof(S_Q_NOTIFY));
514  if (vqn != NULL)
515  {
516  vqn->critical_rcvd_func = critical_rcvd;
517  vqn->normal_rcvd_func = normal_rcvd;
518  vqn->critical_done_func = critical_done;
519  vqn->next = vq_notify_head;
520  vq_notify_head = vqn;
521  }
522  FUNCTION_FINISH(x)
523  return vqn;
524 }
525 
530 void VQ_UnRegisterNotify( void *qn )
531 {
532  S_Q_NOTIFY **ppvqn;
533  FUNCTION_START(x)
534  ppvqn = &vq_notify_head;
535  while (*ppvqn)
536  {
537  if (*ppvqn == qn)
538  {
539  *ppvqn = (*ppvqn)->next;
540  SYS_Free( qn );
541  break;
542  }
543  ppvqn = &((*ppvqn)->next);
544  }
545  FUNCTION_FINISH(x)
546 }
547 
U32BIT STB_OSGetClockMilliseconds(void)
Get Current Computer Clock Time.
void STB_OSSemaphoreWait(void *semaphore)
Wait on Semaphore Indefinity or Until Released.
void VQ_Close(void)
Close component control and section queue component. Destroys all allocated memory and resources for ...
Definition: glue_queue.c:140
U16BIT VQ_GetSizeFree(E_PRIORITY priority)
Get size available on a queue.
Definition: glue_queue.c:434
#define FUNCTION_FINISH(name)
Definition: glue_debug.h:143
F_MSG_PROCESS proc_msg_func
Definition: glue_queue.h:198
F_DESTROY destroy
Definition: fs_types.h:56
U32BIT size
Definition: fs_types.h:54
E_MhegErr VQ_GetMsg(S_MhegMessage *pMsg)
Get an event or section from the component queues. This is a blocking function.
Definition: glue_queue.c:341
F_QueueNotify normal_rcvd_func
Definition: glue_queue.c:59
Debug tracing.
void STB_OSDeleteMutex(void *mutex)
Delete a mutex.
MHEG5 queue.
U16BIT write_q_index
Definition: glue_queue.c:53
void(* F_QueueNotify)(void)
Definition: glue_queue.h:43
U16BIT read_q_index
Definition: glue_queue.c:52
E_MhegErr VQ_Open(S_MhegConfig *cfg_params)
Initialise component control and section queues. Allocates memory for, sets up and creates event (com...
Definition: glue_queue.c:96
#define Q_STATS(s)
Definition: glue_queue.c:39
#define HIGH_Q_SIZE
Definition: glue_queue.c:42
#define SYS_Free
Definition: glue_memory.h:85
E_MhegErr
Definition: mherrors.h:28
F_QueueNotify critical_rcvd_func
Definition: glue_queue.c:60
BOOLEAN VQ_EventNeedsProcessing(void)
Check whether any events on component queues needs processing.
Definition: glue_queue.c:457
struct s_q_notify S_Q_NOTIFY
Memory functions.
void STB_OSMutexUnlock(void *mutex)
Unlock a mutex (a.k.a. &#39;leave&#39;, &#39;signal&#39; or &#39;release&#39;)
#define MHEG5freeMem
Definition: glue_memory.h:94
void STR_DataFree(unsigned char *data, unsigned int size)
Definition: glue_memory.c:668
void * STB_OSCreateMutex(void)
Create a mutex.
E_DATA_TYPE data_type
Definition: glue_queue.h:199
void VQ_UnRegisterNotify(void *qn)
Definition: glue_queue.c:530
uint16_t U16BIT
Definition: techtype.h:84
System Wide Global Technical Data Type Definitions.
S_CONTENT content
Definition: glue_queue.h:202
F_QueueNotify critical_done_func
Definition: glue_queue.c:61
#define FALSE
Definition: techtype.h:68
struct s_q_notify * next
Definition: glue_queue.c:58
#define NORMAL_Q_SIZE
Definition: glue_queue.c:43
void STB_OSMutexLock(void *mutex)
Lock a mutex (a.k.a. &#39;enter&#39;, &#39;wait&#39; or &#39;get&#39;).
#define CRITICAL_Q_SIZE
Definition: glue_queue.c:41
E_MhegErr VQ_PutMsg(S_MhegMessage *pMsg, E_PRIORITY priority)
Post an event or section into component queues. Copies data into queue.
Definition: glue_queue.c:248
U8BIT * data
Definition: fs_types.h:55
U8BIT BOOLEAN
Definition: techtype.h:99
#define TRUE
Definition: techtype.h:69
E_PRIORITY
Definition: glue_queue.h:45
FS_HANDLE fs_handle
Definition: fs_types.h:57
void * VQ_RegisterNotify(F_QueueNotify normal_rcvd, F_QueueNotify critical_rcvd, F_QueueNotify critical_done)
Definition: glue_queue.c:507
#define LOW_Q_SIZE
Definition: glue_queue.c:44
union s_mhg_message::@13 data
#define FUNCTION_START(name)
Definition: glue_debug.h:142
uint32_t U32BIT
Definition: techtype.h:86
void * STB_OSCreateCountSemaphore(U32BIT value)
Create a counting semaphore.
#define SYS_Alloc
Definition: glue_memory.h:84
struct s_msg_queue S_MSG_QUEUE
void STB_OSDeleteSemaphore(void *semaphore)
Delete a Semaphore.
void STB_OSSemaphoreSignal(void *semaphore)
Signal a Semaphore to Release it by decrementing its counter.
#define TRACE(t, x)
Definition: glue_debug.h:118
Header file - Function prototypes for operating system.