48 #define QDBG_FUNC_START(x) TRACE(TICS, (">> %s\n", #x)) 49 #define QDBG_FUNC_END(x) TRACE(TICS, ("<< %s\n", #x)) 51 #define QDBG_FUNC_START(x) 52 #define QDBG_FUNC_END(x) 56 #define QUEUE_CHECK() QueueCheck() 62 #define QUEUE_PRINT() QueuePrint() 67 #define PDBG(x) TRACE(TICS, x) 73 #define MIN_THRESHOLD_SIZE 1048576 81 BOOLEAN IsDataAvailableForStreaming(
void);
84 static void* QueueAlloc(
U32BIT size);
85 static void QueueFree(
void *block);
87 static void QueueCheck(
void);
90 static void QueuePrint(
void);
100 static U32BIT queueAlignMask;
101 static U8BIT *queueMemPool;
102 static U32BIT queueMemPoolSize;
103 static U32BIT queueMinBlockSize;
104 static U32BIT queueLastOffset;
105 static U32BIT queueThresholdSize;
110 static void *queueMutex = NULL;
112 static U32BIT queueStreamingRequestId = 0;
114 static U32BIT queueBufferedBytes;
134 if (queueMutex == NULL)
136 TRACE(TERROR, (
"Streaming queue mutex failure"));
141 queueItemHead = NULL;
142 queueItemTail = NULL;
144 queueBufferedBytes = 0;
175 queueAlign =
sizeof(
U32BIT) * 2;
176 queueAlignMask = ~(queueAlign - 1);
177 queueMemPool = buffer;
178 queueMemPoolSize = bufferSize & queueAlignMask;
179 queueMinBlockSize = 2 *
sizeof(
U32BIT);
183 queueThresholdSize = bufferSize;
190 if (queueMemPoolSize >= 2 * queueMinBlockSize)
195 size = queueMemPoolSize - queueMinBlockSize;
196 assert((size & 0x3) == 0);
201 *((
U32BIT *)(queueMemPool + size -
sizeof(
U32BIT))) = size;
208 *((
U32BIT *)queueMemPool) = size;
215 *((
U32BIT *)(queueMemPool + (size & ~0x3))) = queueMinBlockSize | 0x2;
220 DBG((
"QueueInit: Buffer too small"));
222 queueMemPoolSize = 0;
224 #ifdef ERROR_ON_SMALL_ICS_BUFFER 249 item = QueueAlloc(
sizeof *item);
252 item->
data = QueueAlloc(len);
253 if (item->
data != NULL)
286 if (queueItemHead == NULL)
288 assert(queueItemTail == NULL);
289 queueItemHead = item;
290 queueItemTail = item;
294 assert(queueItemTail != NULL);
295 queueItemTail->
next = item;
296 queueItemTail = item;
299 queueBufferedBytes += item->
len;
300 DBG((
"queueBufferedBytes = %d [1]", queueBufferedBytes));
308 DBG((
"%p (%p,%d) -> ", item, item->
data, item->
len));
313 DBG((
"tail = %p", queueItemTail));
319 if (insertCallback != NULL)
344 if (queueStreamingEnabled)
346 item = queueItemHead;
349 if (item->
requestId == queueStreamingRequestId)
368 item = &queueInvalidItem;
388 if (item == &queueInvalidItem)
423 if (item == queueItemHead)
425 item->
offset += processed;
441 QueueFree(item->
data);
451 DBG((
"%p (%p,%d) -> ", item, item->
data, item->
len));
456 DBG((
"tail = %p", queueItemTail));
462 if (call && releaseCallback != NULL)
464 releaseCallback(requestId, base, position, len, last);
500 if (item == queueItemHead)
503 queueItemHead = queueItemHead->
next;
504 if (item == queueItemTail)
506 assert(queueItemHead == NULL);
507 queueItemTail = NULL;
510 queueBufferedBytes -= item->
len;
511 DBG((
"queueBufferedBytes = %d [2]", queueBufferedBytes));
513 if (queueBufferedBytes < queueThresholdSize)
521 QueueFree(item->
data);
527 PDBG((
"Qbytes=%u, item->referenceCount=%d, item->len=%u", queueBufferedBytes, item->
referenceCount, item->
len));
531 QueueFree(item->
data);
542 DBG((
"%p (%p,%d) -> ", item, item->
data, item->
len));
547 DBG((
"tail = %p", queueItemTail));
553 if (releaseCallback != NULL)
556 releaseCallback(requestId, base, position, len, last);
576 return queueBufferedBytes;
591 PDBG((
"QueueEnableStreaming"));
592 enabled = queueStreamingEnabled;
593 queueStreamingEnabled =
TRUE;
614 PDBG((
"QueueDisableStreaming"));
615 queueStreamingEnabled =
FALSE;
616 queueStreamingRequestId = 0;
628 return queueStreamingEnabled;
643 queueBufferedBytes -= QueueReleaseRequestItems(&queueItemHead,
647 resume = (queueBufferedBytes < queueThresholdSize)?
TRUE :
FALSE;
655 DBG((
"%p (%p,%d) -> ", item, item->
data, item->
len));
660 DBG((
"tail = %p", queueItemTail));
688 pItem = &queueItemHead;
689 while (*pItem != NULL)
703 queueBufferedBytes -= item->
len;
707 QueueFree(item->
data);
720 resume = (queueBufferedBytes < queueThresholdSize)?
TRUE :
FALSE;
723 item = queueItemHead;
726 queueItemTail = NULL;
730 while (item->
next != NULL)
734 queueItemTail = item;
755 QueueReleaseAllItems(&queueItemHead, &queueItemTail);
756 queueBufferedBytes = 0;
758 DBG((
"queueBufferedBytes = 0 [4]"));
777 insertCallback = callback;
792 releaseCallback = callback;
806 queueMemPoolSize = 0;
820 static void* QueueAlloc(
U32BIT size)
835 requestSize = (size +
sizeof(
U32BIT) + queueAlign - 1) & queueAlignMask;
836 QDBG_ALLOC((
"size = %d, requestSize = %d\n", size, requestSize));
838 QDBG_ALLOC((
"queueLastOffset = %d\n", queueLastOffset));
839 offset = queueLastOffset;
840 while (block == NULL && offset < queueMemPoolSize)
842 blockHeader = *((
U32BIT *)(queueMemPool + offset));
843 blockSize = blockHeader & ~0x3;
844 if ((blockHeader & 0x2) || (blockSize < requestSize))
849 QDBG_ALLOC((
"Nothing at %d, trying next offset %d\n",
850 offset, offset + blockSize));
852 if (offset == queueMemPoolSize)
857 if (offset == queueLastOffset)
866 QDBG_ALLOC((
"Found a block - blockSize = %d\n", blockSize));
867 if (blockSize - requestSize >= queueMinBlockSize)
871 newHeader = requestSize;
873 newHeader |= blockHeader & 0x1;
877 *((
U32BIT *)(queueMemPool + offset)) = newHeader;
880 newHeaderPtr = queueMemPool + offset + requestSize;
881 newHeader = blockSize - requestSize;
883 *((
U32BIT *)(newHeaderPtr + newHeader -
sizeof(
U32BIT))) = newHeader;
888 *((
U32BIT *)newHeaderPtr) = newHeader;
902 requestSize = blockSize;
903 newHeader = requestSize;
905 newHeader |= blockHeader & 0x1;
909 *((
U32BIT *)(queueMemPool + offset)) = newHeader;
914 *((
U32BIT *)(queueMemPool + offset + requestSize)) |= 0x1;
919 block = queueMemPool + offset +
sizeof(
U32BIT);
922 queueLastOffset = offset + requestSize;
926 QDBG_ALLOC((
"<< QueueAlloc() -> %p\n", block));
936 static void QueueFree(
void *block)
938 U8BIT *blockHeaderPtr;
939 U8BIT *nextBlockHeaderPtr;
940 U8BIT *prevBlockHeaderPtr;
948 QDBG_FREE((
">> QueueFree(%p)\n", block));
959 blockOffset = (
U8BIT *)block -
sizeof(
U32BIT) - queueMemPool;
960 assert(blockOffset < queueMemPoolSize);
962 blockHeaderPtr = queueMemPool + blockOffset;
963 blockHeader = *((
U32BIT *)blockHeaderPtr);
964 blockSize = blockHeader & ~0x3;
967 assert(blockHeader & 0x2);
971 *((
U32BIT *)blockHeaderPtr) = blockHeader;
974 *((
U32BIT *)(blockHeaderPtr + blockSize -
sizeof(
U32BIT))) = blockSize;
977 nextBlockHeaderPtr = blockHeaderPtr + blockSize;
978 *((
U32BIT *)nextBlockHeaderPtr) &= ~0x1;
982 if ((*((
U32BIT *)nextBlockHeaderPtr) & 0x2) == 0)
985 nextBlockSize = *((
U32BIT *)nextBlockHeaderPtr) & ~0x3;
988 blockSize += nextBlockSize;
989 blockHeader = blockSize | (blockHeader & 0x3);
990 *((
U32BIT *)blockHeaderPtr) = blockHeader;
993 *((
U32BIT *)(blockHeaderPtr + blockSize -
sizeof(
U32BIT))) = blockSize;
999 if (queueLastOffset == nextBlockHeaderPtr - queueMemPool)
1002 queueLastOffset = blockOffset;
1007 if ((blockHeader & 0x1) == 0)
1010 prevBlockSize = *((
U32BIT *)(blockHeaderPtr -
sizeof(
U32BIT)));
1011 prevBlockHeaderPtr = blockHeaderPtr - prevBlockSize;
1012 prevBlockHeader = *((
U32BIT *)prevBlockHeaderPtr);
1015 prevBlockSize += blockSize;
1016 prevBlockHeader = prevBlockSize | (prevBlockHeader & 0x3);
1017 *((
U32BIT *)prevBlockHeaderPtr) = prevBlockHeader;
1020 *((
U32BIT *)(prevBlockHeaderPtr + prevBlockSize -
sizeof(
U32BIT))) = prevBlockSize;
1022 if (queueLastOffset == blockOffset)
1025 queueLastOffset = prevBlockHeaderPtr - queueMemPool;
1038 static void QueueCheck(
void)
1042 U32BIT prevUsed, currUsed, realPrevUsed;
1050 seenLastOffset =
FALSE;
1052 while (offset < queueMemPoolSize)
1054 if (queueLastOffset == offset)
1056 seenLastOffset =
TRUE;
1058 size = *((
U32BIT *)(queueMemPool + offset));
1059 prevUsed = size & 0x1;
1060 currUsed = (size >> 1) & 0x1;
1062 assert(offset + size <= queueMemPoolSize);
1063 assert(prevUsed == realPrevUsed);
1064 realPrevUsed = currUsed;
1065 lastWord = *((
U32BIT *)(queueMemPool + offset + size -
sizeof(
U32BIT)));
1066 assert(currUsed || lastWord == size);
1070 assert(seenLastOffset);
1081 static void QueuePrint(
void)
1085 U32BIT prevUsed, currUsed;
1088 DBG((
">> QueuePrint"));
1091 while (offset < queueMemPoolSize)
1093 size = *((
U32BIT *)(queueMemPool + offset));
1094 prevUsed = size & 0x1;
1095 currUsed = (size >> 1) & 0x1;
1097 lastWord = *((
U32BIT *)(queueMemPool + offset + size -
sizeof(
U32BIT)));
1098 DBG((
"%p: offset=%d, size=%d, prevUsed=%d, currUsed=%d, lastWord=%d",
1099 queueMemPool + offset +
sizeof(
U32BIT),
1100 offset, size, prevUsed, currUsed, lastWord));
1104 DBG((
"<< QueuePrint"));
1130 while (*pItem != NULL)
1135 *pItem = item->
next;
1137 released += item->
len;
1143 QueueFree(item->
data);
1149 pItem = &item->
next;
1155 while (*tail != NULL && (*tail)->
next != NULL)
1157 *tail = (*tail)->
next;
1178 while (item != NULL)
1180 nextItem = item->
next;
1186 QueueFree(item->
data);
#define MIN_THRESHOLD_SIZE
Task functions for IC Streamer.
#define ULL_Sub(variable, value)
Common header internal to IC streamer.
void STB_OSDeleteMutex(void *mutex)
Delete a mutex.
void MHEG5QueueUpdateItem(MHEG5QueueItem *item, U32BIT processed)
Update the number of bytes processed in a queue item. The same item will be returned by the next call...
BOOLEAN MHEG5QueueIsValidItem(MHEG5QueueItem *item)
Tell whether the item returned by MHEG5QueueGetHeadItem is valid. If the item is invalid, then streaming is disabled.
void MHEG5QueueRegisterReleaseCallback(void(*callback)(U32BIT requestId, U64BIT base, U64BIT position, U32BIT len, BOOLEAN last))
Register notification callback for item release events. If a callback is already registered for the e...
struct sMHEG5QueueItem * next
U32BIT MHEG5QueueGetBufferedBytes(void)
Return number of buffered bytes in the queue (regardless of request)
void MHEG5QueueTerm(void)
Terminate queue manager.
#define QDBG_FUNC_START(x)
void STB_OSMutexUnlock(void *mutex)
Unlock a mutex (a.k.a. 'leave', 'signal' or 'release')
IC Streamer queue manager.
This file defines the profile for the MHEG engine.
void * STB_OSCreateMutex(void)
Create a mutex.
void MHEG5QueueReleaseItems(U32BIT requestId, U64BIT marker)
Release queue items that are related to a given request up to a marker.
void MHEG5StreamerSendSignalToTask(void)
Send signal to streamer task to wake it up (if it's asleep)
MHEG5QueueItem * MHEG5QueueGetHeadItem(void)
Return the item at the head of the queue (the next item to consume). The function returns an "invalid...
void MHEG5QueueInsertItem(MHEG5QueueItem *item)
Insert a queue item into the queue.
void MHEG5QueueEnableStreaming(U32BIT requestId)
Allow streaming data from the queue.
void MHEG5QueueRegisterInsertCallback(void(*callback)(U32BIT downloadId, U64BIT base, U64BIT position, U32BIT len, BOOLEAN last))
Register notification callback for item insertion event. If a callback is already registered for the ...
BOOLEAN MHEG5QueueIsStreamingEnabled(void)
Tell whether streaming is enabled.
MHEG5QueueItem * MHEG5QueueAllocItem(U32BIT requestId, U32BIT len)
Allocate a new queue item and initialise with request ID and block length.
#define ULL_Compare(first, second)
void STB_OSMutexLock(void *mutex)
Lock a mutex (a.k.a. 'enter', 'wait' or 'get').
E_MhegErr MHEG5QueueInit(U8BIT *buffer, U32BIT bufferSize)
Initialise queue manager.
void MHEG5QueueReleaseRequestItems(U32BIT requestId)
Release all queue items that are related to a given request.
void MHEG5QueueReleaseItem(MHEG5QueueItem *item)
Release an item from the queue. If the item is no longer in the queue (because the queue has been cle...
void SendSignalToTask(void)
void MHEG5QueueReleaseAllItems(void)
Release all queue items - clear the queue completely.
void MHEG5ResumeDownload(void)
Resume download of the active request. Download may or may not have been paused due to the buffer bei...
void MHEG5QueueDisableStreaming(void)
Do not allow streaming data from the queue.
#define ULL_Get32(variable)
IC Streamer download manager.
#define ULL_Add32(variable, value)
Header file - Function prototypes for operating system.