MHEG5  18.9.0
MHEG5 Documentation
stmr_queue.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright © 2014 The DTVKit Open Software Foundation Ltd (www.dtvkit.org)
3  * Copyright © 2010 Ocean Blue Software Ltd
4  *
5  * This file is part of a DTVKit Software Component
6  * You are permitted to copy, modify or distribute this file subject to the terms
7  * of the DTVKit 1.0 Licence which can be found in licence.txt or at www.dtvkit.org
8  *
9  * THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND,
10  * EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES
11  * OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.
12  *
13  * If you or your organisation is not a member of DTVKit then you have access
14  * to this source code outside of the terms of the licence agreement
15  * and you are expected to delete this and any associated files immediately.
16  * Further information on DTVKit, membership and terms can be found at www.dtvkit.org
17  *******************************************************************************/
25 /*---includes for this file--------------------------------------------------*/
26 #include <string.h>
27 
28 #include "stb_os.h"
29 #include "glue_memory.h"
30 #include "glue_debug.h"
31 
32 #include "mh5profile.h"
33 #include "stmr_queue.h"
34 #include "stmr_common.h"
35 #include "stmr_task.h"
36 #include "stmr_dl.h"
37 /*---constant definitions for this file--------------------------------------*/
38 
39 #undef QDBG_FUNC
40 #define QDBG_CHECK
41 #undef QDBG_PRINT
42 #undef QDBG_ITEMS
43 
44 #define QDBG_ALLOC(x)
45 #define QDBG_FREE(x)
46 
47 #ifdef QDBG_FUNC
48 #define QDBG_FUNC_START(x) TRACE(TICS, (">> %s\n", #x))
49 #define QDBG_FUNC_END(x) TRACE(TICS, ("<< %s\n", #x))
50 #else
51 #define QDBG_FUNC_START(x)
52 #define QDBG_FUNC_END(x)
53 #endif
54 
55 #ifdef QDBG_CHECK
56 #define QUEUE_CHECK() QueueCheck()
57 #else
58 #define QUEUE_CHECK()
59 #endif /* QDBG_CHECK */
60 
61 #ifdef QDBG_PRINT
62 #define QUEUE_PRINT() QueuePrint()
63 #else
64 #define QUEUE_PRINT()
65 #endif /* QDBG_PRINT */
66 
67 #define PDBG(x) TRACE(TICS, x)
68 #define DBG(x)
69 
70 
71 /*---local typedef structs for this file-------------------------------------*/
72 
73 #define MIN_THRESHOLD_SIZE 1048576
74 
75 /*---local function definitions----------------------------------------------*/
76 
77 /*---global function definitions---------------------------------------------*/
78 
79 void SendSignalToTask(void);
80 #ifdef TO_REMOVE
81 BOOLEAN IsDataAvailableForStreaming(void);
82 #endif /* TO_REMOVE */
83 
84 static void* QueueAlloc(U32BIT size);
85 static void QueueFree(void *block);
86 #ifdef QDBG_CHECK
87 static void QueueCheck(void);
88 #endif
89 #ifdef QDBG_PRINT
90 static void QueuePrint(void);
91 #endif
92 
93 static U32BIT QueueReleaseRequestItems(MHEG5QueueItem **head,
94  MHEG5QueueItem **tail,
96 static void QueueReleaseAllItems(MHEG5QueueItem **head, MHEG5QueueItem **tail);
97 
98 /*---local (static) variable declarations for this file----------------------*/
99 static U32BIT queueAlign;
100 static U32BIT queueAlignMask;
101 static U8BIT *queueMemPool;
102 static U32BIT queueMemPoolSize;
103 static U32BIT queueMinBlockSize;
104 static U32BIT queueLastOffset;
105 static U32BIT queueThresholdSize;
106 
107 static MHEG5QueueItem *queueItemHead = NULL;
108 static MHEG5QueueItem *queueItemTail = NULL;
109 
110 static void *queueMutex = NULL;
111 static BOOLEAN queueStreamingEnabled = FALSE;
112 static U32BIT queueStreamingRequestId = 0;
113 static MHEG5QueueItem queueInvalidItem;
114 static U32BIT queueBufferedBytes;
115 
116 static void (*insertCallback)(U32BIT downloadId, U64BIT base,
117  U64BIT position, U32BIT len, BOOLEAN last) = 0;
118 static void (*releaseCallback)(U32BIT downloadId, U64BIT base,
119  U64BIT position, U32BIT len, BOOLEAN last) = 0;
120 
121 
129 {
130  E_MhegErr result;
131  U32BIT size;
132 
133  queueMutex = STB_OSCreateMutex();
134  if (queueMutex == NULL)
135  {
136  TRACE(TERROR, ("Streaming queue mutex failure"));
138  }
139  else
140  {
141  queueItemHead = NULL;
142  queueItemTail = NULL;
143 
144  queueBufferedBytes = 0;
145 
146  /* Queue manager is a mini memory manager. It uses the provided buffer
147  * as the memory area to allocate memory from. It's a general purpose
148  * memory manager, used for items as well as data blocks.
149  *
150  * Used blocks hold the size at the start of the block. The block
151  * size is always a multiple of 4, so the two LSBs are "unused" for the
152  * size. So the memory manager uses them to find out whether the
153  * previous block is used (bit 0) and whether the current block is used
154  * (bit 1).
155  *
156  * Unused blocks also contain the size at the end of the block.
157  *
158  * Used block: | size | this used | prev used | data |
159  * Unused block: | size | this used | prev used | empty | size |
160  *
161  * This structure allows unused blocks to be merged, while keeping
162  * overhead to a minimum.
163  *
164  * Initial setting is two blocks:
165  * 1. One big block at the start with "this used" false and "prev used"
166  * true (so that it is never merged with the previous block, which
167  * doesn't exist)
168  * 2. One small block at the end with "this used" true and "prev used"
169  * false (so that it is never merged with the large block).
170  *
171  * This allows allocation / free functions to be generic without worrying
172  * about first / last block.
173  */
174 
175  queueAlign = sizeof(U32BIT) * 2;
176  queueAlignMask = ~(queueAlign - 1);
177  queueMemPool = buffer;
178  queueMemPoolSize = bufferSize & queueAlignMask;
179  queueMinBlockSize = 2 * sizeof(U32BIT);
180  queueLastOffset = 0;
181  if (bufferSize < MIN_THRESHOLD_SIZE)
182  {
183  queueThresholdSize = bufferSize;
184  }
185  else
186  {
187  queueThresholdSize = MIN_THRESHOLD_SIZE + ((bufferSize-MIN_THRESHOLD_SIZE)/2);
188  }
189  result = MHERR_OK;
190  if (queueMemPoolSize >= 2 * queueMinBlockSize)
191  {
192  /* Create two blocks - one "real" to use for allocations, and one
193  * "fake" as a sentinel at the end of the buffer
194  */
195  size = queueMemPoolSize - queueMinBlockSize;
196  assert((size & 0x3) == 0);
197 
198  /* The first block is unused, so the size is copied to the end of the
199  * block
200  */
201  *((U32BIT *)(queueMemPool + size - sizeof(U32BIT))) = size;
202 
203  /* The "previous" block is marked as "used". This stops any attempts to
204  * merge the block with the non-existent previous block. Current block
205  * is not used.
206  */
207  size |= 0x1;
208  *((U32BIT *)queueMemPool) = size;
209 
210  /* We mark the sentinel block as "used" to stop any attempts to merge
211  * it with previous blocks. This allows the allocation functions never
212  * to worry about first/last blocks. The previous block is marked as
213  * "unused".
214  */
215  *((U32BIT *)(queueMemPool + (size & ~0x3))) = queueMinBlockSize | 0x2;
216  }
217  else
218  {
219  /* Buffer too small */
220  DBG(("QueueInit: Buffer too small"));
221  queueMemPool = NULL;
222  queueMemPoolSize = 0;
223 
224  #ifdef ERROR_ON_SMALL_ICS_BUFFER
225  STB_OSDeleteMutex( queueMutex );
226  queueMutex = NULL;
227  result = MHERR_BAD_ICS_BUFFER_SIZE;
228  #endif
229  }
230  }
231  return result;
232 }
233 
242 {
243  MHEG5QueueItem *item;
244 
246 
247  STB_OSMutexLock(queueMutex);
248 
249  item = QueueAlloc(sizeof *item);
250  if (item != NULL)
251  {
252  item->data = QueueAlloc(len);
253  if (item->data != NULL)
254  {
255  item->requestId = requestId;
256  item->len = 0;
257  item->offset = 0;
258  item->referenceCount = 1;
259  }
260  else
261  {
262  QueueFree(item);
263  item = NULL;
264  }
265  }
266 
267  STB_OSMutexUnlock(queueMutex);
268 
270 
271  return item;
272 }
273 
280 {
282 
283  STB_OSMutexLock(queueMutex);
284 
285  item->next = NULL;
286  if (queueItemHead == NULL)
287  {
288  assert(queueItemTail == NULL);
289  queueItemHead = item;
290  queueItemTail = item;
291  }
292  else
293  {
294  assert(queueItemTail != NULL);
295  queueItemTail->next = item;
296  queueItemTail = item;
297  }
298 
299  queueBufferedBytes += item->len;
300  DBG(("queueBufferedBytes = %d [1]", queueBufferedBytes));
301 
302  #ifdef QDBG_ITEMS
303  {
304  MHEG5QueueItem *item = queueItemHead;
305 
306  while (item != NULL)
307  {
308  DBG(("%p (%p,%d) -> ", item, item->data, item->len));
309  item = item->next;
310  }
311  DBG(("nil"));
312 
313  DBG(("tail = %p", queueItemTail));
314  }
315  #endif
316 
317  STB_OSMutexUnlock(queueMutex);
318 
319  if (insertCallback != NULL)
320  {
321  /* Notify item insertion (queueMutex should be unlocked) */
322  insertCallback(item->downloadId, item->base, item->position,
323  item->len, item->last);
324  }
325 
327 }
328 
337 {
338  MHEG5QueueItem *item;
339 
341 
342  STB_OSMutexLock(queueMutex);
343 
344  if (queueStreamingEnabled)
345  {
346  item = queueItemHead;
347  if (item != NULL)
348  {
349  if (item->requestId == queueStreamingRequestId)
350  {
351  /* Item can be released */
352  ++item->referenceCount;
353  }
354  else
355  {
356  /* Item belongs to another stream, pretend that
357  * the queue is empty.
358  */
359  item = NULL;
360  }
361  }
362  }
363  else
364  {
365  /* Either streaming is not allowed, or there's not enough data
366  * for streaming (or both).
367  */
368  item = &queueInvalidItem;
369  }
370 
371  STB_OSMutexUnlock(queueMutex);
372 
374 
375  return item;
376 }
377 
385 {
386  BOOLEAN valid = TRUE;
387 
388  if (item == &queueInvalidItem)
389  {
390  valid = FALSE;
391  }
392 
393  return valid;
394 }
395 
407 {
408  BOOLEAN call;
409  U32BIT requestId = 0;
410  U64BIT base;
411  U64BIT position;
412  U32BIT len = 0;
413  BOOLEAN last = FALSE;
414 
416 
417  STB_OSMutexLock(queueMutex);
418 
419  call = FALSE;
420  --item->referenceCount;
421 
422  /* If the item is in the linked list, it's at the head of the list */
423  if (item == queueItemHead)
424  {
425  item->offset += processed;
426  }
427  else
428  {
429  /* Updating causes the item to be released */
430  call = TRUE;
431 
432  requestId = item->requestId;
433  base = item->base;
434  position = item->position;
435  len = item->len;
436  last = item->last;
437 
438  /* The item is not in the queue, all references are lost and it
439  * must be freed.
440  */
441  QueueFree(item->data);
442  QueueFree(item);
443  }
444 
445  #ifdef QDBG_ITEMS
446  {
447  MHEG5QueueItem *item = queueItemHead;
448 
449  while (item != NULL)
450  {
451  DBG(("%p (%p,%d) -> ", item, item->data, item->len));
452  item = item->next;
453  }
454  DBG(("nil"));
455 
456  DBG(("tail = %p", queueItemTail));
457  }
458  #endif
459 
460  STB_OSMutexUnlock(queueMutex);
461 
462  if (call && releaseCallback != NULL)
463  {
464  releaseCallback(requestId, base, position, len, last);
465  }
466 
468 }
469 
478 {
480  U64BIT base;
481  U64BIT position;
482  U32BIT len;
483  BOOLEAN last, resume;
484 
486 
487  STB_OSMutexLock(queueMutex);
488 
489  /* Item information is passed to the callback */
490  requestId = item->requestId;
491  base = item->base;
492  position = item->position;
493  len = item->len;
494  last = item->last;
495  resume = FALSE;
496 
497  --item->referenceCount;
498 
499  /* If the item is in the linked list, it's at the head of the list */
500  if (item == queueItemHead)
501  {
502  /* Remove item from item list */
503  queueItemHead = queueItemHead->next;
504  if (item == queueItemTail)
505  {
506  assert(queueItemHead == NULL);
507  queueItemTail = NULL;
508  }
509 
510  queueBufferedBytes -= item->len;
511  DBG(("queueBufferedBytes = %d [2]", queueBufferedBytes));
512 
513  if (queueBufferedBytes < queueThresholdSize)
514  {
515  resume = TRUE;
516  }
517  /* The item is freed only if it's not currently used by the task */
518  --item->referenceCount;
519  if (item->referenceCount == 0)
520  {
521  QueueFree(item->data);
522  QueueFree(item);
523  }
524  }
525  else
526  {
527  PDBG(("Qbytes=%u, item->referenceCount=%d, item->len=%u", queueBufferedBytes, item->referenceCount, item->len));
528  if (item->referenceCount <= 1)
529  {
530  assert(item->referenceCount == 0);
531  QueueFree(item->data);
532  QueueFree(item);
533  }
534  }
535 
536  #ifdef QDBG_ITEMS
537  {
538  MHEG5QueueItem *item = queueItemHead;
539 
540  while (item != NULL)
541  {
542  DBG(("%p (%p,%d) -> ", item, item->data, item->len));
543  item = item->next;
544  }
545  DBG(("nil"));
546 
547  DBG(("tail = %p", queueItemTail));
548  }
549  #endif
550 
551  STB_OSMutexUnlock(queueMutex);
552 
553  if (releaseCallback != NULL)
554  {
555  /* Notify item released (queueMutex should be unlocked) */
556  releaseCallback(requestId, base, position, len, last);
557  }
558  if (resume)
559  {
561  }
562 
564 }
565 
571 {
573 
575 
576  return queueBufferedBytes;
577 }
578 
585 {
586  BOOLEAN enabled;
587 
589 
590  STB_OSMutexLock(queueMutex);
591  PDBG(("QueueEnableStreaming"));
592  enabled = queueStreamingEnabled;
593  queueStreamingEnabled = TRUE;
594  queueStreamingRequestId = requestId;
595  if (!enabled)
596  {
597  /* Also signal to the task that reading was enabled */
599  }
600  STB_OSMutexUnlock(queueMutex);
601 
603 }
604 
610 {
612 
613  STB_OSMutexLock(queueMutex);
614  PDBG(("QueueDisableStreaming"));
615  queueStreamingEnabled = FALSE;
616  queueStreamingRequestId = 0;
617  STB_OSMutexUnlock(queueMutex);
618 
620 }
621 
627 {
628  return queueStreamingEnabled;
629 }
630 
637 {
638  BOOLEAN resume;
640 
641  STB_OSMutexLock(queueMutex);
642 
643  queueBufferedBytes -= QueueReleaseRequestItems(&queueItemHead,
644  &queueItemTail,
645  requestId);
646 
647  resume = (queueBufferedBytes < queueThresholdSize)? TRUE : FALSE;
648 
649  #ifdef QDBG_ITEMS
650  {
651  MHEG5QueueItem *item = queueItemHead;
652 
653  while (item != NULL)
654  {
655  DBG(("%p (%p,%d) -> ", item, item->data, item->len));
656  item = item->next;
657  }
658  DBG(("nil"));
659 
660  DBG(("tail = %p", queueItemTail));
661  }
662  #endif
663 
664  STB_OSMutexUnlock(queueMutex);
665 
666  if (resume)
667  {
669  }
670 
672 }
673 
681 {
682  MHEG5QueueItem **pItem, *item;
683  U64BIT pos;
684  BOOLEAN resume;
686 
687  STB_OSMutexLock(queueMutex);
688  pItem = &queueItemHead;
689  while (*pItem != NULL)
690  {
691  item = *pItem;
692  if (item->requestId != requestId || ULL_Compare(item->position,marker) > 0)
693  {
694  pItem = &item->next;
695  }
696  else
697  {
698  pos = item->position;
699  ULL_Add32(pos,item->len);
700  if (ULL_Compare(pos,marker) <= 0)
701  {
702  *pItem = item->next;
703  queueBufferedBytes -= item->len;
704  --item->referenceCount;
705  if (item->referenceCount == 0)
706  {
707  QueueFree(item->data);
708  QueueFree(item);
709  }
710  }
711  else
712  {
713  pos = marker;
714  ULL_Sub(pos,item->position);
715  item->offset = ULL_Get32(pos);
716  pItem = &item->next;
717  }
718  }
719  }
720  resume = (queueBufferedBytes < queueThresholdSize)? TRUE : FALSE;
721 
722  /* Find the tail of the list */
723  item = queueItemHead;
724  if (item == NULL)
725  {
726  queueItemTail = NULL;
727  }
728  else
729  {
730  while (item->next != NULL)
731  {
732  item = item->next;
733  }
734  queueItemTail = item;
735  }
736  STB_OSMutexUnlock(queueMutex);
737 
738  if (resume)
739  {
741  }
743 }
744 
750 {
752 
753  STB_OSMutexLock(queueMutex);
754 
755  QueueReleaseAllItems(&queueItemHead, &queueItemTail);
756  queueBufferedBytes = 0;
757 
758  DBG(("queueBufferedBytes = 0 [4]"));
759 
760  STB_OSMutexUnlock(queueMutex);
761 
763 }
764 
772  U64BIT base,
773  U64BIT position,
774  U32BIT len,
775  BOOLEAN last))
776 {
777  insertCallback = callback;
778 }
779 
787  U64BIT base,
788  U64BIT position,
789  U32BIT len,
790  BOOLEAN last))
791 {
792  releaseCallback = callback;
793 }
794 
799 void MHEG5QueueTerm(void)
800 {
801  void *mutex;
802 
803  STB_OSMutexLock(queueMutex);
804 
805  queueMemPool = NULL;
806  queueMemPoolSize = 0;
807 
808  mutex = queueMutex;
809  queueMutex = NULL;
810 
811  STB_OSMutexUnlock(mutex);
812  STB_OSDeleteMutex(mutex);
813 }
814 
820 static void* QueueAlloc(U32BIT size)
821 {
822  U32BIT offset;
823  U32BIT blockHeader;
824  U32BIT blockSize;
825  U32BIT requestSize;
826  U32BIT newHeader;
827  U8BIT *newHeaderPtr;
828  void *block;
829 
830  QDBG_ALLOC((">> QueueAlloc(%d)\n", size));
831 
832  block = NULL;
833 
834  /* Make sure the size uses the alignment */
835  requestSize = (size + sizeof(U32BIT) + queueAlign - 1) & queueAlignMask;
836  QDBG_ALLOC(("size = %d, requestSize = %d\n", size, requestSize));
837 
838  QDBG_ALLOC(("queueLastOffset = %d\n", queueLastOffset));
839  offset = queueLastOffset;
840  while (block == NULL && offset < queueMemPoolSize)
841  {
842  blockHeader = *((U32BIT *)(queueMemPool + offset));
843  blockSize = blockHeader & ~0x3;
844  if ((blockHeader & 0x2) || (blockSize < requestSize))
845  {
846  /* Current block is used, or not large enough to satisfy the
847  * request. Move on to the next block.
848  */
849  QDBG_ALLOC(("Nothing at %d, trying next offset %d\n",
850  offset, offset + blockSize));
851  offset += blockSize;
852  if (offset == queueMemPoolSize)
853  {
854  /* Back to start of the buffer */
855  offset = 0;
856  }
857  if (offset == queueLastOffset)
858  {
859  /* Back to where we started, all blocks are used */
860  break;
861  }
862  }
863  else
864  {
865  /* Found a suitable block - take only what we need */
866  QDBG_ALLOC(("Found a block - blockSize = %d\n", blockSize));
867  if (blockSize - requestSize >= queueMinBlockSize)
868  {
869  /* Split the block into two */
870  QDBG_ALLOC(("Need to split the block\n"));
871  newHeader = requestSize;
872  /* Copy "previous used" flag from original header */
873  newHeader |= blockHeader & 0x1;
874  /* Current block is marked as "used" */
875  newHeader |= 0x2;
876  /* Write the header into the buffer */
877  *((U32BIT *)(queueMemPool + offset)) = newHeader;
878 
879  /* Next block is requestSize bytes away */
880  newHeaderPtr = queueMemPool + offset + requestSize;
881  newHeader = blockSize - requestSize;
882  /* Update block size at the end of the unused block */
883  *((U32BIT *)(newHeaderPtr + newHeader - sizeof(U32BIT))) = newHeader;
884  /* "previous used" is 1, we've just created it. "current used"
885  * is 0 because we started with an unused block. */
886  newHeader |= 0x1;
887  /* Write the header into the buffer */
888  *((U32BIT *)newHeaderPtr) = newHeader;
889 
890  /* Note that the block after that doesn't have to change. It's
891  * "previous used" flag is 0 (because the block was unused),
892  * and because we've split the block into two, that's still the
893  * case.
894  */
895 
896  QUEUE_CHECK();
897  }
898  else
899  {
900  /* Use the entire block */
901  QDBG_ALLOC(("Use entire block\n"));
902  requestSize = blockSize;
903  newHeader = requestSize;
904  /* Copy "previous used" flag from original header */
905  newHeader |= blockHeader & 0x1;
906  /* Current block is marked as "used" */
907  newHeader |= 0x2;
908  /* Write the header into the buffer */
909  *((U32BIT *)(queueMemPool + offset)) = newHeader;
910 
911  /* Next block's "previous used" flag must be updated to
912  * reflect the fact that we've allocate a block
913  */
914  *((U32BIT *)(queueMemPool + offset + requestSize)) |= 0x1;
915 
916  QUEUE_CHECK();
917  }
918 
919  block = queueMemPool + offset + sizeof(U32BIT);
920 
921  /* Look for the next block from the end of this block onwards */
922  queueLastOffset = offset + requestSize;
923  }
924  }
925 
926  QDBG_ALLOC(("<< QueueAlloc() -> %p\n", block));
927 
928  return block;
929 }
930 
936 static void QueueFree(void *block)
937 {
938  U8BIT *blockHeaderPtr;
939  U8BIT *nextBlockHeaderPtr;
940  U8BIT *prevBlockHeaderPtr;
941  U32BIT blockHeader;
942  U32BIT blockOffset;
943  U32BIT blockSize;
944  U32BIT nextBlockSize;
945  U32BIT prevBlockSize;
946  U32BIT prevBlockHeader;
947 
948  QDBG_FREE((">> QueueFree(%p)\n", block));
949 
950  /* Freeing a block is done in stages. There are 4 possible scenarios
951  * where previous/next block are used/unused. Instead of checking each
952  * scenario separately, we just follow the following (slightly inefficient)
953  * sequence:
954  * 1. Free the block without any merging
955  * 2. If the next block is unused, merge with next block.
956  * 3. If the previous block is unused, merge with previous block.
957  */
958 
959  blockOffset = (U8BIT *)block - sizeof(U32BIT) - queueMemPool;
960  assert(blockOffset < queueMemPoolSize);
961 
962  blockHeaderPtr = queueMemPool + blockOffset;
963  blockHeader = *((U32BIT *)blockHeaderPtr);
964  blockSize = blockHeader & ~0x3;
965 
966  /* Make sure the block is marked as "used" */
967  assert(blockHeader & 0x2);
968 
969  /* Mark current block as "unused" */
970  blockHeader &= ~0x2;
971  *((U32BIT *)blockHeaderPtr) = blockHeader;
972 
973  /* Copy block size to the end of the block */
974  *((U32BIT *)(blockHeaderPtr + blockSize - sizeof(U32BIT))) = blockSize;
975 
976  /* Update next block */
977  nextBlockHeaderPtr = blockHeaderPtr + blockSize;
978  *((U32BIT *)nextBlockHeaderPtr) &= ~0x1;
979 
980  QUEUE_CHECK();
981 
982  if ((*((U32BIT *)nextBlockHeaderPtr) & 0x2) == 0)
983  {
984  /* Next block is unused, merge with next block */
985  nextBlockSize = *((U32BIT *)nextBlockHeaderPtr) & ~0x3;
986 
987  /* Update current header */
988  blockSize += nextBlockSize;
989  blockHeader = blockSize | (blockHeader & 0x3);
990  *((U32BIT *)blockHeaderPtr) = blockHeader;
991 
992  /* Copy next block size to the end of the block */
993  *((U32BIT *)(blockHeaderPtr + blockSize - sizeof(U32BIT))) = blockSize;
994 
995  /* No need to update the block after that, as it already indicates
996  * that its previous block is unused.
997  */
998 
999  if (queueLastOffset == nextBlockHeaderPtr - queueMemPool)
1000  {
1001  /* Last offset is invalid, make it valid again */
1002  queueLastOffset = blockOffset;
1003  }
1004  QUEUE_CHECK();
1005  }
1006 
1007  if ((blockHeader & 0x1) == 0)
1008  {
1009  /* Previous block is unused, merge with current block */
1010  prevBlockSize = *((U32BIT *)(blockHeaderPtr - sizeof(U32BIT)));
1011  prevBlockHeaderPtr = blockHeaderPtr - prevBlockSize;
1012  prevBlockHeader = *((U32BIT *)prevBlockHeaderPtr);
1013 
1014  /* Update previous block header */
1015  prevBlockSize += blockSize;
1016  prevBlockHeader = prevBlockSize | (prevBlockHeader & 0x3);
1017  *((U32BIT *)prevBlockHeaderPtr) = prevBlockHeader;
1018 
1019  /* Copy previous block size to the end of the block */
1020  *((U32BIT *)(prevBlockHeaderPtr + prevBlockSize - sizeof(U32BIT))) = prevBlockSize;
1021 
1022  if (queueLastOffset == blockOffset)
1023  {
1024  /* Last offset is invalid, make it valid again */
1025  queueLastOffset = prevBlockHeaderPtr - queueMemPool;
1026  }
1027  QUEUE_CHECK();
1028  }
1029 
1030  QDBG_FREE(("<< QueueFree()\n"));
1031 }
1032 
1033 #ifdef QDBG_CHECK
1034 
1038 static void QueueCheck(void)
1039 {
1040  U32BIT offset;
1041  U32BIT size;
1042  U32BIT prevUsed, currUsed, realPrevUsed;
1043  U32BIT lastWord;
1044  BOOLEAN seenLastOffset;
1045 
1046  QUEUE_PRINT();
1047 
1048  offset = 0;
1049  realPrevUsed = 1;
1050  seenLastOffset = FALSE;
1051 
1052  while (offset < queueMemPoolSize)
1053  {
1054  if (queueLastOffset == offset)
1055  {
1056  seenLastOffset = TRUE;
1057  }
1058  size = *((U32BIT *)(queueMemPool + offset));
1059  prevUsed = size & 0x1;
1060  currUsed = (size >> 1) & 0x1;
1061  size &= ~0x3;
1062  assert(offset + size <= queueMemPoolSize);
1063  assert(prevUsed == realPrevUsed);
1064  realPrevUsed = currUsed;
1065  lastWord = *((U32BIT *)(queueMemPool + offset + size - sizeof(U32BIT)));
1066  assert(currUsed || lastWord == size);
1067  offset += size;
1068  }
1069 
1070  assert(seenLastOffset);
1071 }
1072 
1073 #endif /* QDBG_CHECK */
1074 
1075 
1076 #ifdef QDBG_PRINT
1077 
1081 static void QueuePrint(void)
1082 {
1083  U32BIT offset;
1084  U32BIT size;
1085  U32BIT prevUsed, currUsed;
1086  U32BIT lastWord;
1087 
1088  DBG((">> QueuePrint"));
1089 
1090  offset = 0;
1091  while (offset < queueMemPoolSize)
1092  {
1093  size = *((U32BIT *)(queueMemPool + offset));
1094  prevUsed = size & 0x1;
1095  currUsed = (size >> 1) & 0x1;
1096  size &= ~0x3;
1097  lastWord = *((U32BIT *)(queueMemPool + offset + size - sizeof(U32BIT)));
1098  DBG(("%p: offset=%d, size=%d, prevUsed=%d, currUsed=%d, lastWord=%d",
1099  queueMemPool + offset + sizeof(U32BIT),
1100  offset, size, prevUsed, currUsed, lastWord));
1101  offset += size;
1102  }
1103 
1104  DBG(("<< QueuePrint"));
1105 }
1106 
1107 #endif /* QDBG_PRINT */
1108 
1109 
1118 static U32BIT QueueReleaseRequestItems(MHEG5QueueItem **head,
1119  MHEG5QueueItem **tail,
1120  U32BIT requestId)
1121 {
1122  MHEG5QueueItem **pItem, *item;
1123  U32BIT released;
1124 
1125  QDBG_FUNC_START(QueueReleaseRequestItems);
1126 
1127  released = 0;
1128 
1129  pItem = head;
1130  while (*pItem != NULL)
1131  {
1132  item = *pItem;
1133  if (item->requestId == requestId)
1134  {
1135  *pItem = item->next;
1136 
1137  released += item->len;
1138 
1139  /* The item is freed only if it's not currently used by the task */
1140  --item->referenceCount;
1141  if (item->referenceCount == 0)
1142  {
1143  QueueFree(item->data);
1144  QueueFree(item);
1145  }
1146  }
1147  else
1148  {
1149  pItem = &item->next;
1150  }
1151  }
1152 
1153  /* Find the tail of the list */
1154  *tail = *head;
1155  while (*tail != NULL && (*tail)->next != NULL)
1156  {
1157  *tail = (*tail)->next;
1158  }
1159 
1160  QDBG_FUNC_END(QueueReleaseRequestItems);
1161 
1162  return released;
1163 }
1164 
1171 static void QueueReleaseAllItems(MHEG5QueueItem **head, MHEG5QueueItem **tail)
1172 {
1173  MHEG5QueueItem *item, *nextItem;
1174 
1175  QDBG_FUNC_START(QueueReleaseAllItems);
1176 
1177  item = *head;
1178  while (item != NULL)
1179  {
1180  nextItem = item->next;
1181 
1182  /* The item is freed only if it's not currently used by the task */
1183  --item->referenceCount;
1184  if (item->referenceCount == 0)
1185  {
1186  QueueFree(item->data);
1187  QueueFree(item);
1188  }
1189 
1190  item = nextItem;
1191  }
1192 
1193  *head = NULL;
1194  *tail = NULL;
1195 
1196  QDBG_FUNC_END(QueueReleaseAllItems);
1197 }
1198 
#define MIN_THRESHOLD_SIZE
Definition: stmr_queue.c:73
Task functions for IC Streamer.
U32BIT downloadId
Definition: stmr_msp.c:72
#define ULL_Sub(variable, value)
Definition: glue_ulong.h:82
Common header internal to IC streamer.
#define QUEUE_PRINT()
Definition: stmr_queue.c:64
#define QDBG_FUNC_END(x)
Definition: stmr_queue.c:52
Debug tracing.
void STB_OSDeleteMutex(void *mutex)
Delete a mutex.
#define DBG(x)
Definition: stmr_queue.c:68
#define QDBG_FREE(x)
Definition: stmr_queue.c:45
void MHEG5QueueUpdateItem(MHEG5QueueItem *item, U32BIT processed)
Update the number of bytes processed in a queue item. The same item will be returned by the next call...
Definition: stmr_queue.c:406
BOOLEAN MHEG5QueueIsValidItem(MHEG5QueueItem *item)
Tell whether the item returned by MHEG5QueueGetHeadItem is valid. If the item is invalid, then streaming is disabled.
Definition: stmr_queue.c:384
void MHEG5QueueRegisterReleaseCallback(void(*callback)(U32BIT requestId, U64BIT base, U64BIT position, U32BIT len, BOOLEAN last))
Register notification callback for item release events. If a callback is already registered for the e...
Definition: stmr_queue.c:786
E_MhegErr
Definition: mherrors.h:28
struct sMHEG5QueueItem * next
Definition: stmr_queue.h:48
U32BIT MHEG5QueueGetBufferedBytes(void)
Return number of buffered bytes in the queue (regardless of request)
Definition: stmr_queue.c:570
U16BIT referenceCount
Definition: stmr_queue.h:58
uint8_t U8BIT
Definition: techtype.h:82
U64BIT position
Definition: stmr_queue.h:54
void MHEG5QueueTerm(void)
Terminate queue manager.
Definition: stmr_queue.c:799
#define QDBG_FUNC_START(x)
Definition: stmr_queue.c:51
Memory functions.
#define QDBG_ALLOC(x)
Definition: stmr_queue.c:44
void STB_OSMutexUnlock(void *mutex)
Unlock a mutex (a.k.a. &#39;leave&#39;, &#39;signal&#39; or &#39;release&#39;)
IC Streamer queue manager.
This file defines the profile for the MHEG engine.
#define PDBG(x)
Definition: stmr_queue.c:67
void * STB_OSCreateMutex(void)
Create a mutex.
void MHEG5QueueReleaseItems(U32BIT requestId, U64BIT marker)
Release queue items that are related to a given request up to a marker.
Definition: stmr_queue.c:680
void MHEG5StreamerSendSignalToTask(void)
Send signal to streamer task to wake it up (if it&#39;s asleep)
Definition: stmr_task.c:122
MHEG5QueueItem * MHEG5QueueGetHeadItem(void)
Return the item at the head of the queue (the next item to consume). The function returns an "invalid...
Definition: stmr_queue.c:336
int len
Definition: mh5gate.c:57
void MHEG5QueueInsertItem(MHEG5QueueItem *item)
Insert a queue item into the queue.
Definition: stmr_queue.c:279
#define QUEUE_CHECK()
Definition: stmr_queue.c:56
BOOLEAN last
Definition: stmr_queue.h:57
void MHEG5QueueEnableStreaming(U32BIT requestId)
Allow streaming data from the queue.
Definition: stmr_queue.c:584
U32BIT requestId
Definition: stmr_queue.h:49
void MHEG5QueueRegisterInsertCallback(void(*callback)(U32BIT downloadId, U64BIT base, U64BIT position, U32BIT len, BOOLEAN last))
Register notification callback for item insertion event. If a callback is already registered for the ...
Definition: stmr_queue.c:771
BOOLEAN MHEG5QueueIsStreamingEnabled(void)
Tell whether streaming is enabled.
Definition: stmr_queue.c:626
Definition: mg_png.c:52
MHEG5QueueItem * MHEG5QueueAllocItem(U32BIT requestId, U32BIT len)
Allocate a new queue item and initialise with request ID and block length.
Definition: stmr_queue.c:241
#define FALSE
Definition: techtype.h:68
#define ULL_Compare(first, second)
Definition: glue_ulong.h:93
void STB_OSMutexLock(void *mutex)
Lock a mutex (a.k.a. &#39;enter&#39;, &#39;wait&#39; or &#39;get&#39;).
E_MhegErr MHEG5QueueInit(U8BIT *buffer, U32BIT bufferSize)
Initialise queue manager.
Definition: stmr_queue.c:128
void MHEG5QueueReleaseRequestItems(U32BIT requestId)
Release all queue items that are related to a given request.
Definition: stmr_queue.c:636
void MHEG5QueueReleaseItem(MHEG5QueueItem *item)
Release an item from the queue. If the item is no longer in the queue (because the queue has been cle...
Definition: stmr_queue.c:477
void SendSignalToTask(void)
U8BIT BOOLEAN
Definition: techtype.h:99
U8BIT * data
Definition: stmr_queue.h:51
U32BIT downloadId
Definition: stmr_queue.h:50
#define TRUE
Definition: techtype.h:69
U32BIT requestId
Definition: stmr_msp.c:75
void MHEG5QueueReleaseAllItems(void)
Release all queue items - clear the queue completely.
Definition: stmr_queue.c:749
void MHEG5ResumeDownload(void)
Resume download of the active request. Download may or may not have been paused due to the buffer bei...
Definition: stmr_dl.c:527
void MHEG5QueueDisableStreaming(void)
Do not allow streaming data from the queue.
Definition: stmr_queue.c:609
#define ULL_Get32(variable)
Definition: glue_ulong.h:79
uint32_t U32BIT
Definition: techtype.h:86
IC Streamer download manager.
#define ULL_Add32(variable, value)
Definition: glue_ulong.h:81
#define TRACE(t, x)
Definition: glue_debug.h:118
Header file - Function prototypes for operating system.