WINE_DEFAULT_DEBUG_CHANNEL(strmbase);
+enum {SAMPLE_PACKET, EOS_PACKET};
+
typedef struct tagQueuedEvent {
+ int type;
struct list entry;
IMediaSample *pSample;
hr = E_OUTOFMEMORY;
break;
}
+ qev->type = SAMPLE_PACKET;
qev->pSample = ppSamples[i];
IMediaSample_AddRef(ppSamples[i]);
list_add_tail(pOutputQueue->SampleList, &qev->entry);
return OutputQueue_ReceiveMultiple(pOutputQueue,&pSample,1,&processed);
}
+VOID WINAPI OutputQueue_SendAnyway(OutputQueue *pOutputQueue)
+{
+ if (pOutputQueue->hThread)
+ {
+ EnterCriticalSection(&pOutputQueue->csQueue);
+ if (list_count(pOutputQueue->SampleList) > 0)
+ {
+ pOutputQueue->bSendAnyway = TRUE;
+ SetEvent(pOutputQueue->hProcessQueue);
+ }
+ LeaveCriticalSection(&pOutputQueue->csQueue);
+ }
+}
+
+VOID WINAPI OutputQueue_EOS(OutputQueue *pOutputQueue)
+{
+ EnterCriticalSection(&pOutputQueue->csQueue);
+ if (pOutputQueue->hThread)
+ {
+ QueuedEvent *qev = HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent));
+ if (!qev)
+ {
+ ERR("Out of Memory\n");
+ LeaveCriticalSection(&pOutputQueue->csQueue);
+ return;
+ }
+ qev->type = EOS_PACKET;
+ qev->pSample = NULL;
+ list_add_tail(pOutputQueue->SampleList, &qev->entry);
+ }
+ else
+ {
+ IPin* ppin = NULL;
+ IPin_ConnectedTo((IPin*)pOutputQueue->pInputPin, &ppin);
+ if (ppin)
+ {
+ IPin_EndOfStream(ppin);
+ IPin_Release(ppin);
+ }
+ }
+ LeaveCriticalSection(&pOutputQueue->csQueue);
+ /* Covers sending the Event to the worker Thread */
+ OutputQueue_SendAnyway(pOutputQueue);
+}
+
DWORD WINAPI OutputQueueImpl_ThreadProc(OutputQueue *pOutputQueue)
{
do
{
EnterCriticalSection(&pOutputQueue->csQueue);
if (list_count(pOutputQueue->SampleList) > 0 &&
- (!pOutputQueue->bBatchExact ||
- list_count(pOutputQueue->SampleList) >= pOutputQueue->lBatchSize))
+ (!pOutputQueue->bBatchExact ||
+ list_count(pOutputQueue->SampleList) >= pOutputQueue->lBatchSize ||
+ pOutputQueue->bSendAnyway
+ )
+ )
{
- IMediaSample **ppSamples;
- LONG nSamples;
- LONG nSamplesProcessed;
- struct list *cursor, *cursor2;
- int i = 0;
-
- nSamples = list_count(pOutputQueue->SampleList);
- ppSamples = HeapAlloc(GetProcessHeap(),0,sizeof(IMediaSample*) * nSamples);
- LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList)
+ while (list_count(pOutputQueue->SampleList) > 0)
{
- QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
- list_remove(cursor);
- ppSamples[i++] = qev->pSample;
- HeapFree(GetProcessHeap(),0,qev);
- }
+ IMediaSample **ppSamples;
+ LONG nSamples;
+ LONG nSamplesProcessed;
+ struct list *cursor, *cursor2;
+ int i = 0;
- if (pOutputQueue->pInputPin->pin.pConnectedTo && pOutputQueue->pInputPin->pMemInputPin)
- {
- IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin);
- IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin, ppSamples, nSamples, &nSamplesProcessed);
- IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin);
+ /* First Pass Process Samples */
+ i = list_count(pOutputQueue->SampleList);
+ ppSamples = HeapAlloc(GetProcessHeap(),0,sizeof(IMediaSample*) * i);
+ nSamples = 0;
+ LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList)
+ {
+ QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
+ if (qev->type == SAMPLE_PACKET)
+ ppSamples[nSamples++] = qev->pSample;
+ else
+ break;
+ list_remove(cursor);
+ HeapFree(GetProcessHeap(),0,qev);
+ }
+
+ if (pOutputQueue->pInputPin->pin.pConnectedTo && pOutputQueue->pInputPin->pMemInputPin)
+ {
+ IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin);
+ LeaveCriticalSection(&pOutputQueue->csQueue);
+ IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin, ppSamples, nSamples, &nSamplesProcessed);
+ EnterCriticalSection(&pOutputQueue->csQueue);
+ IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin);
+ }
+ for (i = 0; i < nSamples; i++)
+ IUnknown_Release(ppSamples[i]);
+ HeapFree(GetProcessHeap(),0,ppSamples);
+
+ /* Process Non-Samples */
+ if (list_count(pOutputQueue->SampleList) > 0)
+ {
+ LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList)
+ {
+ QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
+ if (qev->type == EOS_PACKET)
+ {
+ IPin* ppin = NULL;
+ IPin_ConnectedTo((IPin*)pOutputQueue->pInputPin, &ppin);
+ if (ppin)
+ {
+ IPin_EndOfStream(ppin);
+ IPin_Release(ppin);
+ }
+ }
+ else if (qev->type == SAMPLE_PACKET)
+ break;
+ else
+ FIXME("Unhandled Event type %i\n",qev->type);
+ list_remove(cursor);
+ HeapFree(GetProcessHeap(),0,qev);
+ }
+ }
}
- for (i = 0; i < nSamples; i++)
- IUnknown_Release(ppSamples[i]);
- HeapFree(GetProcessHeap(),0,ppSamples);
+ pOutputQueue->bSendAnyway = FALSE;
}
LeaveCriticalSection(&pOutputQueue->csQueue);
WaitForSingleObject(pOutputQueue->hProcessQueue, INFINITE);