MHEG5  18.9.0
MHEG5 Documentation
stmr_task.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright © 2014 The DTVKit Open Software Foundation Ltd (www.dtvkit.org)
3  * Copyright © 2010 Ocean Blue Software Ltd
4  *
5  * This file is part of a DTVKit Software Component
6  * You are permitted to copy, modify or distribute this file subject to the terms
7  * of the DTVKit 1.0 Licence which can be found in licence.txt or at www.dtvkit.org
8  *
9  * THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND,
10  * EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES
11  * OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.
12  *
13  * If you or your organisation is not a member of DTVKit then you have access
14  * to this source code outside of the terms of the licence agreement
15  * and you are expected to delete this and any associated files immediately.
16  * Further information on DTVKit, membership and terms can be found at www.dtvkit.org
17  *******************************************************************************/
25 /*---includes for this file--------------------------------------------------*/
26 #include <assert.h>
27 #include <string.h>
28 
29 #include "mh5debug.h"
30 #include "httptype.h"
31 #include "stb_os.h"
32 #include "dvb_ics.h"
33 
34 #include "stmr_queue.h"
35 #include "stmr_common.h"
36 
37 /*---constant definitions for this file--------------------------------------*/
38 
39 /* Look-ahead time difference for flow control (in milliseconds). This
40  * determines how far "stream time" can be ahead of "real time". Larger
41  * values introduce risk of overflow, smaller values introduce risks of
42  * underflow. 1000ms seems to be a good value for the DTG tests.
43  */
44 #define LOOK_AHEAD_DIFF 1000
45 
46 /* Look-ahead threshold takes care of "strange" PCR differences, for example
47  * if the stream loops (PCR jumps to the initial value), or if two streams
48  * were concatenated with completely different PCRs (unpredictable jump).
49  * The look-ahead is expected to be somewhere between 0 and LOOK_AHEAD_DIFF
50  * (could be slightly negative if the platform blocks while data is played).
51  * If it escapes from this range, new values are calculated.
52  */
53 #define LOOK_AHEAD_THRESHOLD 2500
54 
55 #define PDBG(x) TRACE(TICS, x)
56 #define DBG(x)
57 
58 
59 /*---local typedef structs for this file-------------------------------------*/
60 
61 typedef enum
62 {
66 } TaskState;
67 
68 /*---local function definitions----------------------------------------------*/
69 
70 /*---global function definitions---------------------------------------------*/
71 static void TaskMain(void *task_arg);
72 static void HandleQueueItem(MHEG5QueueItem *item);
73 
74 /*---local (static) variable declarations for this file----------------------*/
75 static void *taskSemaphore = NULL;
76 static void *taskHandle;
77 
78 static void (*underflowCallback)(U32BIT requestId,U32BIT playoutTime) = NULL;
79 
80 static volatile TaskState taskState;
81 static volatile BOOLEAN producerCanSignal = FALSE;
82 
83 
93 {
94  E_MhegErr result;
95  taskState = TASK_RUNNING;
96  taskSemaphore = STB_OSCreateSemaphore();
97  if (taskSemaphore == NULL)
98  {
100  }
101  else
102  {
103  /* Initial value for semaphore is 1, so do wait to decrement value to 0 */
104  STB_OSSemaphoreWait(taskSemaphore);
105  taskHandle = STB_OSCreateTask(TaskMain, NULL, stack, priority, (U8BIT *)"MHST");
106  if (taskHandle == NULL)
107  {
109  }
110  else
111  {
112  result = MHERR_OK;
113  }
114  }
115  return result;
116 }
117 
123 {
124  if (taskSemaphore != NULL && producerCanSignal)
125  {
126  producerCanSignal = FALSE;
127  STB_OSSemaphoreSignal(taskSemaphore);
128  }
129 }
130 
137 {
138  underflowCallback = callback;
139 }
140 
146 {
147  void *semaphore;
148 
149  taskState = TASK_STOP_REQUEST;
151 
152  /* It's possible that the semaphore was signalled by someone else. We
153  * have to check the stop conditions and keep checking until the
154  * task signals the semaphore to announce that it's stopping.
155  */
156  while (taskState == TASK_RUNNING || taskState == TASK_STOP_REQUEST)
157  {
158  /* Just busy wait for the task to acknowledge */
159  }
160  taskHandle = NULL;
161 
162  semaphore = taskSemaphore;
163  taskSemaphore = NULL;
164  STB_OSDeleteSemaphore(semaphore);
165 }
166 
167 /*******************
168  * LOCAL FUNCTION *
169  ********************/
170 
178 static void TaskMain(void *task_arg)
179 {
180  MHEG5QueueItem *item;
181  U32BIT lastDownloadId;
182  U32BIT lastRequestId;
183  U32BIT refItemTime, lastItemTime;
184  U32BIT clockTime, refClockTime;
185  U32BIT relativeClockTime, relativeItemTime;
186  S32BIT lastLookahead;
187 #ifdef TRACING
188  U32BIT prevItemTime = 0;
189 #endif
190 
191  refItemTime = 0;
192  lastItemTime = 0;
193  relativeItemTime = 0;
194  lastLookahead = 0;
195 
196  lastDownloadId = 0;
197  lastRequestId = 0;
198 
199  DBG(("Entering main loop"));
200  while (taskState == TASK_RUNNING)
201  {
202  PDBG(("Waiting for semaphore"));
203  producerCanSignal = TRUE;
204  STB_OSSemaphoreWait(taskSemaphore);
205  producerCanSignal = FALSE;
206 
207  /* Reference clock time depends on last look-ahead:
208  *
209  * lastLookahead = relativeItemTime - relativeClockTime (1)
210  * relativeClockTime = clockTime - refClockTime
211  *
212  * So:
213  *
214  * refClockTime = clockTime - relativeClockTime
215  * relativeClockTime = relativeItemTime - lastLookahead
216  *
217  * And so:
218  *
219  * refClockTime = clockTime - (relativeItemTime - lastLookahead)
220  * = lastLookahead + clockTime - relativeItemTime
221  *
222  * lastLookahead - relativeItemTime = -relativeClockTime (according
223  * to (1)). As clockTime - relativeClockTime should be positive,
224  * refClockTime should also be positive.
225  *
226  */
227 
228  clockTime = STB_OSGetClockMilliseconds();
229  refClockTime = lastLookahead + clockTime - relativeItemTime;
230  PDBG(("Recalculating refClockTime: %d + %d - %d = %d",
231  lastLookahead, clockTime, relativeItemTime, refClockTime));
232  DBG(("taskState = %d", taskState));
233  while (taskState == TASK_RUNNING)
234  {
235  item = MHEG5QueueGetHeadItem();
236  DBG(("item = %p [%d]", item, MHEG5QueueIsValidItem(item)));
237  if (item == NULL)
238  {
239  PDBG(("Streaming stopped RID=%d",lastRequestId));
240  if (underflowCallback != NULL)
241  {
242  PDBG(("lla=%d-(%d-%d)",lastLookahead,STB_OSGetClockMilliseconds(),clockTime));
243  if (lastLookahead > 0)
244  {
245  refClockTime = STB_OSGetClockMilliseconds();
246  clockTime += lastLookahead;
247  if (clockTime > refClockTime)
248  {
249  clockTime -= refClockTime;
250  }
251  else
252  {
253  clockTime = 0;
254  }
255  }
256  else
257  {
258  clockTime = 0;
259  }
260  underflowCallback(lastRequestId,clockTime);
261  }
262  break;
263  }
264  if (!MHEG5QueueIsValidItem(item))
265  {
266  PDBG(("Invalid item, streaming stopped"));
267  break;
268  }
269  clockTime = STB_OSGetClockMilliseconds();
270 
271  if (lastDownloadId == item->downloadId)
272  {
273  if (refClockTime == 0 && clockTime != 0)
274  {
275  /* Update first time */
276  refClockTime = clockTime;
277  }
278  if (item->timestamp != 0)
279  {
280  if (refItemTime == 0)
281  {
282  /* Update first item time */
283  refItemTime = item->timestamp;
284  }
285  #ifdef TRACING
286  prevItemTime = lastItemTime;
287  #endif
288  lastItemTime = item->timestamp;
289  }
290  if ((clockTime < refClockTime) ||
291  (lastItemTime < refItemTime) ||
292  (lastLookahead < -LOOK_AHEAD_THRESHOLD) ||
293  (lastLookahead > LOOK_AHEAD_THRESHOLD))
294  {
295  /* Overflow or other issue - reset */
296  PDBG(("Resetting PCR monitor - invalid values (%u,%u) (%u,%u) %d prev=%u",
297  clockTime,refClockTime, lastItemTime,refItemTime, lastLookahead, prevItemTime ));
298  refClockTime = 0;
299  refItemTime = 0;
300  lastItemTime = 0;
301  lastLookahead = 0;
302  relativeItemTime = 0;
303  }
304  }
305  else
306  {
307  /* New download - reset */
308  PDBG(("Resetting PCR monitor RID=%d->%d DID=%d->%d",lastRequestId,item->requestId,lastDownloadId,item->downloadId));
309  PDBG(("Timestamp=%d -> %d",lastItemTime,item->timestamp));
310  refClockTime = 0;
311  refItemTime = 0;
312  lastItemTime = 0;
313  lastLookahead = 0;
314  relativeItemTime = 0;
315  }
316  lastRequestId = item->requestId;
317  lastDownloadId = item->downloadId;
318 
319  if (refClockTime == 0 || refItemTime == 0)
320  {
321  HandleQueueItem(item);
322  STB_OSTaskDelay(1);
323  }
324  else
325  {
326  assert(clockTime >= refClockTime);
327  assert(lastItemTime >= refItemTime);
328 
329  relativeClockTime = clockTime - refClockTime;
330  relativeItemTime = lastItemTime - refItemTime;
331  lastLookahead = relativeItemTime - relativeClockTime;
332 
333  DBG(("curr [%d] - ref [%d] = %d, "
334  "item [%d] - ref [%d] = %d, lla = %d",
335  clockTime, refClockTime, clockTime - refClockTime,
336  lastItemTime, refItemTime, lastItemTime - refItemTime, lastLookahead));
337  if (lastLookahead > LOOK_AHEAD_DIFF)
338  {
339  DBG(("Flow control - pausing"));
340  MHEG5QueueUpdateItem(item, 0);
341  STB_OSTaskDelay((U16BIT)(lastLookahead-LOOK_AHEAD_DIFF));
342  }
343  else
344  {
345  HandleQueueItem(item);
346  STB_OSTaskDelay(1);
347  }
348  }
349  }
350  }
351 
352  DBG(("TaskMain: Left main loop"));
353  taskState = TASK_STOPPED;
354 
355  STB_OSDestroyTask(taskHandle);
356 }
357 
365 static void HandleQueueItem(MHEG5QueueItem *item)
366 {
367  U8BIT *data;
368  U32BIT len;
369  U32BIT processed;
370  BOOLEAN last;
371 
372  /* Part of the item may have been processed already */
373  data = item->data + item->offset;
374  len = item->len - item->offset;
375  last = item->last;
376 
377  DBG(("TaskMain: Sending %d/%d bytes (offset %d, last=%s)",
378  len, item->len, item->offset, last ? "TRUE" : "FALSE"));
379 
380  /* Send data to external application */
381  processed = DVB_MhegICStreamHandleData(data, len, last);
382  if (processed == len)
383  {
384  /* The entire item was processed - release the item */
385  MHEG5QueueReleaseItem(item);
386  }
387  else
388  {
389  /* Item still contains unprocessed data - update and keep it */
390  MHEG5QueueUpdateItem(item, processed);
391  }
392 }
393 
U32BIT STB_OSGetClockMilliseconds(void)
Get Current Computer Clock Time.
void STB_OSSemaphoreWait(void *semaphore)
Wait on Semaphore Indefinity or Until Released.
U32BIT DVB_MhegICStreamHandleData(U8BIT *data, U32BIT len, BOOLEAN last)
Handle transport stream data. The data is part of a single-program transport stream, containing audio, video and/or subtitles (in one or more languages). The video is H.264 SD video and the audio is HE-AAC audio. The stream contains PAT and PMT (other SI data can be ignored). This function can block until the data has been processed / buffered. It may also return when part of the block has been processed. This would cause a subsequent call with the rest of the block. When the last block is presented (last=TRUE), the function should indicate that the block has been completely processed only when this is a true reflection of the presentation status.
const char * data
Definition: mh5gate.c:56
void STB_OSTaskDelay(U16BIT timeout)
Delay Task for Specifed Time Period.
Common header internal to IC streamer.
E_MhegErr MHEG5StreamerStartTask(U32BIT stack, U8BIT priority)
Start streamer task. The task passes stream data to the external application and generates stream and...
Definition: stmr_task.c:92
#define LOOK_AHEAD_THRESHOLD
Definition: stmr_task.c:53
void MHEG5QueueUpdateItem(MHEG5QueueItem *item, U32BIT processed)
Update the number of bytes processed in a queue item. The same item will be returned by the next call...
Definition: stmr_queue.c:406
BOOLEAN MHEG5QueueIsValidItem(MHEG5QueueItem *item)
Tell whether the item returned by MHEG5QueueGetHeadItem is valid. If the item is invalid, then streaming is disabled.
Definition: stmr_queue.c:384
E_MhegErr
Definition: mherrors.h:28
#define LOOK_AHEAD_DIFF
Definition: stmr_task.c:44
U32BIT timestamp
Definition: stmr_queue.h:56
uint8_t U8BIT
Definition: techtype.h:82
void MHEG5StreamerRegisterUnderflowCallback(void(*callback)(U32BIT requestId, U32BIT playoutTime))
Register notification callback for underflow events. If a callback is already registered for the even...
Definition: stmr_task.c:136
TaskState
Definition: stmr_task.c:61
IC Streamer queue manager.
void * STB_OSCreateSemaphore(void)
Create a Binary Semaphore. That is maximum value of 1. The initial value should be 1...
void MHEG5StreamerSendSignalToTask(void)
Send signal to streamer task to wake it up (if it&#39;s asleep)
Definition: stmr_task.c:122
MHEG5QueueItem * MHEG5QueueGetHeadItem(void)
Return the item at the head of the queue (the next item to consume). The function returns an "invalid...
Definition: stmr_queue.c:336
int len
Definition: mh5gate.c:57
int32_t S32BIT
Definition: techtype.h:87
uint16_t U16BIT
Definition: techtype.h:84
Mheg5 logging and debug printing.
#define PDBG(x)
Definition: stmr_task.c:55
BOOLEAN last
Definition: stmr_queue.h:57
U32BIT requestId
Definition: stmr_queue.h:49
#define DBG(x)
Definition: stmr_task.c:56
#define FALSE
Definition: techtype.h:68
void MHEG5QueueReleaseItem(MHEG5QueueItem *item)
Release an item from the queue. If the item is no longer in the queue (because the queue has been cle...
Definition: stmr_queue.c:477
Interaction Channel Streaming functions required by MHEG5 engine References: [1] UK1 Profile - Digita...
U8BIT BOOLEAN
Definition: techtype.h:99
U8BIT * data
Definition: stmr_queue.h:51
U32BIT downloadId
Definition: stmr_queue.h:50
#define TRUE
Definition: techtype.h:69
HTTP types.
void * STB_OSCreateTask(void(*function)(void *), void *param, U32BIT stack, U8BIT priority, U8BIT *name)
Create a New Task to the calling process. Upon success, the created task runs on its own stack...
void MHEG5StreamerStopTask(void)
Stop streamer task. This function blocks until the task is stopped.
Definition: stmr_task.c:145
uint32_t U32BIT
Definition: techtype.h:86
void STB_OSDestroyTask(void *task)
Delete Task must be called upon termination of each task as it frees all OS specific resources alloca...
void STB_OSDeleteSemaphore(void *semaphore)
Delete a Semaphore.
void STB_OSSemaphoreSignal(void *semaphore)
Signal a Semaphore to Release it by decrementing its counter.
Header file - Function prototypes for operating system.