/* MtDec.c -- Multi-thread Decoder
2018-07-04 : Igor Pavlov : Public domain */
#include "Precomp.h"
// #define SHOW_DEBUG_INFO
// #include <stdio.h>
#ifdef SHOW_DEBUG_INFO
#include <stdio.h>
#endif
#ifdef SHOW_DEBUG_INFO
#define PRF(x) x
#else
#define PRF(x)
#endif
#define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
#include "MtDec.h"
#ifndef _7ZIP_ST
void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
{
p->progress = progress;
p->res = SZ_OK;
p->totalInSize = 0;
p->totalOutSize = 0;
}
SRes MtProgress_Progress_ST(CMtProgress *p)
{
if (p->res == SZ_OK && p->progress)
if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
p->res = SZ_ERROR_PROGRESS;
return p->res;
}
SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)
{
SRes res;
CriticalSection_Enter(&p->cs);
p->totalInSize += inSize;
p->totalOutSize += outSize;
if (p->res == SZ_OK && p->progress)
if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
p->res = SZ_ERROR_PROGRESS;
res = p->res;
CriticalSection_Leave(&p->cs);
return res;
}
SRes MtProgress_GetError(CMtProgress *p)
{
SRes res;
CriticalSection_Enter(&p->cs);
res = p->res;
CriticalSection_Leave(&p->cs);
return res;
}
void MtProgress_SetError(CMtProgress *p, SRes res)
{
CriticalSection_Enter(&p->cs);
if (p->res == SZ_OK)
p->res = res;
CriticalSection_Leave(&p->cs);
}
#define RINOK_THREAD(x) RINOK(x)
static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
{
if (Event_IsCreated(p))
return Event_Reset(p);
return AutoResetEvent_CreateNotSignaled(p);
}
typedef struct
{
void *next;
void *pad[3];
} CMtDecBufLink;
#define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
#define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)
static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);
static WRes MtDecThread_CreateEvents(CMtDecThread *t)
{
WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite);
if (wres == 0)
{
wres = ArEvent_OptCreate_And_Reset(&t->canRead);
if (wres == 0)
return SZ_OK;
}
return wres;
}
static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
{
WRes wres = MtDecThread_CreateEvents(t);
// wres = 17; // for test
if (wres == 0)
{
if (Thread_WasCreated(&t->thread))
return SZ_OK;
wres = Thread_Create(&t->thread, ThreadFunc, t);
if (wres == 0)
return SZ_OK;
}
return MY_SRes_HRESULT_FROM_WRes(wres);
}
void MtDecThread_FreeInBufs(CMtDecThread *t)
{
if (t->inBuf)
{
void *link = t->inBuf;
t->inBuf = NULL;
do
{
void *next = ((CMtDecBufLink *)link)->next;
ISzAlloc_Free(t->mtDec->alloc, link);
link = next;
}
while (link);
}
}
static void MtDecThread_CloseThread(CMtDecThread *t)
{
if (Thread_WasCreated(&t->thread))
{
Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
Event_Set(&t->canRead);
Thread_Wait(&t->thread);
Thread_Close(&t->thread);
}
Event_Close(&t->canRead);
Event_Close(&t->canWrite);
}
static void MtDec_CloseThreads(CMtDec *p)
{
unsigned i;
for (i = 0; i < MTDEC__THREADS_MAX; i++)
MtDecThread_CloseThread(&p->threads[i]);
}
static void MtDecThread_Destruct(CMtDecThread *t)
{
MtDecThread_CloseThread(t);
MtDecThread_FreeInBufs(t);
}
static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
{
size_t size = *processedSize;
*processedSize = 0;
while (size != 0)
{
size_t cur = size;
SRes res = ISeqInStream_Read(stream, data, &cur);
*processedSize += cur;
data += cur;
size -= cur;
RINOK(res);
if (cur == 0)
return SZ_OK;
}
return SZ_OK;
}
static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted)
{
SRes res;
CriticalSection_Enter(&p->mtProgress.cs);
*wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
res = p->mtProgress.res;
CriticalSection_Leave(&p->mtProgress.cs);
return res;
}
static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted)
{
SRes res;
CriticalSection_Enter(&p->mtProgress.cs);
p->mtProgress.totalInSize += inSize;
p->mtProgress.totalOutSize += outSize;
if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)
if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)
p->mtProgress.res = SZ_ERROR_PROGRESS;
*wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
res = p->mtProgress.res;
CriticalSection_Leave(&p->mtProgress.cs);
return res;
}
static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)
{
CriticalSection_Enter(&p->mtProgress.cs);
if (!p->needInterrupt || interruptIndex < p->interruptIndex)
{
p->interruptIndex = interruptIndex;
p->needInterrupt = True;
}
CriticalSection_Leave(&p->mtProgress.cs);
}
Byte *MtDec_GetCrossBuff(CMtDec *p)
{
Byte *cr = p->crossBlock;
if (!cr)
{
cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
if (!cr)
return NULL;
p->crossBlock = cr;
}
return MTDEC__DATA_PTR_FROM_LINK(cr);
}
/*
ThreadFunc2() returns:
0 - in all normal cases (even for stream error or memory allocation error)
(!= 0) - WRes error return by system threading function
*/
// #define MTDEC_ProgessStep (1 << 22)
#define MTDEC_ProgessStep (1 << 0)
static WRes ThreadFunc2(CMtDecThread *t)
{
CMtDec *p = t->mtDec;
PRF_STR_INT("ThreadFunc2", t->index);
// SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);
for (;;)
{
SRes res, codeRes;
BoolInt wasInterrupted, isAllocError, overflow, finish;
SRes threadingErrorSRes;
BoolInt needCode, needWrite, needContinue;
size_t inDataSize_Start;
UInt64 inDataSize;
// UInt64 inDataSize_Full;
UInt64 blockIndex;
UInt64 inPrev = 0;
UInt64 outPrev = 0;
UInt64 inCodePos;
UInt64 outCodePos;
Byte *afterEndData = NULL;
size_t afterEndData_Size = 0;
BoolInt canCreateNewThread = False;
// CMtDecCallbackInfo parse;
CMtDecThread *nextThread;
PRF_STR_INT("Event_Wait(&t->canRead)", t->index);
RINOK_THREAD(Event_Wait(&t->canRead));
if (p->exitThread)
return 0;
PRF_STR_INT("after Event_Wait(&t->canRead)", t->index);
// if (t->index == 3) return 19; // for test
blockIndex = p->blockIndex++;
// PRF(printf("\ncanRead\n"))
res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
finish = p->readWasFinished;
needCode = False;
needWrite = False;
isAllocError = False;
overflow = False;
inDataSize_Start = 0;
inDataSize = 0;
// inDataSize_Full = 0;
if (res == SZ_OK && !wasInterrupted)
{
// if (p->inStream)
{
CMtDecBufLink *prev = NULL;
CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
size_t crossSize = p->crossEnd - p->crossStart;
PRF(printf("\ncrossSize = %d\n", crossSize));
for (;;)
{
if (!link)
{
link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
if (!link)
{
finish = True;
// p->allocError_for_Read_BlockIndex = blockIndex;
isAllocError = True;
break;
}
link->next = NULL;
if (prev)
{
// static unsigned g_num = 0;
// printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
prev->next = link;
}
else
t->inBuf = (void *)link;
}
{
Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);
Byte *parseData = data;
size_t size;
if (crossSize != 0)
{
inDataSize = crossSize;
// inDataSize_Full = inDataSize;
inDataSize_Start = crossSize;
size = crossSize;
parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
PRF(printf("\ncross : crossStart = %7d crossEnd = %7d finish = %1d",
(int)p->crossStart, (int)p->crossEnd, (int)finish));
}
else
{
size = p->inBufSize;
res = FullRead(p->inStream, data, &size);
// size = 10; // test
inDataSize += size;
// inDataSize_Full = inDataSize;
if (!prev)
inDataSize_Start = size;
p->readProcessed += size;
finish = (size != p->inBufSize);
if (finish)
p->readWasFinished = True;
// res = E_INVALIDARG; // test
if (res != SZ_OK)
{
// PRF(printf("\nRead error = %d\n", res))
// we want to decode all data before error
p->readRes = res;
// p->readError_BlockIndex = blockIndex;
p->readWasFinished = True;
finish = True;
res = SZ_OK;
// break;
}
if (inDataSize - inPrev >= MTDEC_ProgessStep)
{
res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
if (res != SZ_OK || wasInterrupted)
break;
inPrev = inDataSize;
}
}
{
CMtDecCallbackInfo parse;
parse.startCall = (prev == NULL);
parse.src = parseData;
parse.srcSize = size;
parse.srcFinished = finish;
parse.canCreateNewThread = True;
// PRF(printf("\nParse size = %d\n", (unsigned)size))
p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);
needWrite = True;
canCreateNewThread = parse.canCreateNewThread;
// printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
if (
// parseRes != SZ_OK ||
// inDataSize - (size - parse.srcSize) > p->inBlockMax
// ||
parse.state == MTDEC_PARSE_OVERFLOW
// || wasInterrupted
)
{
// Overflow or Parse error - switch from MT decoding to ST decoding
finish = True;
overflow = True;
{
PRF(printf("\n Overflow"));
// PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));
}
if (crossSize != 0)
memcpy(data, parseData, size);
p->crossStart = 0;
p->crossEnd = 0;
break;
}
if (crossSize != 0)
{
memcpy(data, parseData, parse.srcSize);
p->crossStart += parse.srcSize;
}
if (parse.state != MTDEC_PARSE_CONTINUE || finish)
{
// we don't need to parse in current thread anymore
if (parse.state == MTDEC_PARSE_END)
finish = True;
needCode = True;
// p->crossFinished = finish;
if (parse.srcSize == size)
{
// full parsed - no cross transfer
p->crossStart = 0;
p->crossEnd = 0;
break;
}
if (parse.state == MTDEC_PARSE_END)
{
p->crossStart = 0;
p->crossEnd = 0;
if (crossSize != 0)
memcpy(data + parse.srcSize, parseData + parse.srcSize, size - parse.srcSize); // we need all data
afterEndData_Size = size - parse.srcSize;
afterEndData = parseData + parse.srcSize;
// we reduce data size to required bytes (parsed only)
inDataSize -= (size - parse.srcSize);
if (!prev)
inDataSize_Start = parse.srcSize;
break;
}
{
// partial parsed - need cross transfer
if (crossSize != 0)
inDataSize = parse.srcSize; // it's only parsed now
else
{
// partial parsed - is not in initial cross block - we need to copy new data to cross block
Byte *cr = MtDec_GetCrossBuff(p);
if (!cr)
{
{
PRF(printf("\ncross alloc error error\n"));
// res = SZ_ERROR_MEM;
finish = True;
// p->allocError_for_Read_BlockIndex = blockIndex;
isAllocError = True;
break;
}
}
{
size_t crSize = size - parse.srcSize;
inDataSize -= crSize;
p->crossEnd = crSize;
p->crossStart = 0;
memcpy(cr, parseData + parse.srcSize, crSize);
}
}
// inDataSize_Full = inDataSize;
if (!prev)
inDataSize_Start = parse.srcSize; // it's partial size (parsed only)
finish = False;
break;
}
}
if (parse.srcSize != size)
{
res = SZ_ERROR_FAIL;
PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));
break;
}
}
}
prev = link;
link = link->next;
if (crossSize != 0)
{
crossSize = 0;
p->crossStart = 0;
p->crossEnd = 0;
}
}
}
if (res == SZ_OK)
res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);
}
codeRes = SZ_OK;
if (res == SZ_OK && needCode && !wasInterrupted)
{
codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);
if (codeRes != SZ_OK)
{
needCode = False;
finish = True;
// SZ_ERROR_MEM is expected error here.
// if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
// if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
}
}
if (res != SZ_OK || wasInterrupted)
finish = True;
nextThread = NULL;
threadingErrorSRes = SZ_OK;
if (!finish)
{
if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)
{
SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);
if (res2 == SZ_OK)
{
// if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
p->numStartedThreads++;
}
else
{
PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));
if (p->numStartedThreads == 1)
{
// if only one thread is possible, we leave muti-threading code
finish = True;
needCode = False;
threadingErrorSRes = res2;
}
else
p->numStartedThreads_Limit = p->numStartedThreads;
}
}
if (!finish)
{
unsigned nextIndex = t->index + 1;
nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];
RINOK_THREAD(Event_Set(&nextThread->canRead))
// We have started executing for new iteration (with next thread)
// And that next thread now is responsible for possible exit from decoding (threading_code)
}
}
// each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
// if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
// if ( finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
// - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
// - otherwise we stop decoding and exit from ThreadFunc2()
// Don't change (finish) variable in the further code
// ---------- CODE ----------
inPrev = 0;
outPrev = 0;
inCodePos = 0;
outCodePos = 0;
if (res == SZ_OK && needCode && codeRes == SZ_OK)
{
BoolInt isStartBlock = True;
CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
for (;;)
{
size_t inSize;
int stop;
if (isStartBlock)
inSize = inDataSize_Start;
else
{
UInt64 rem = inDataSize - inCodePos;
inSize = p->inBufSize;
if (inSize > rem)
inSize = (size_t)rem;
}
inCodePos += inSize;
stop = True;
codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,
(const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,
(inCodePos == inDataSize), // srcFinished
&inCodePos, &outCodePos, &stop);
if (codeRes != SZ_OK)
{
PRF(printf("\nCode Interrupt error = %x\n", codeRes));
// we interrupt only later blocks
MtDec_Interrupt(p, blockIndex);
break;
}
if (stop || inCodePos == inDataSize)
break;
{
const UInt64 inDelta = inCodePos - inPrev;
const UInt64 outDelta = outCodePos - outPrev;
if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)
{
// Sleep(1);
res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);
if (res != SZ_OK || wasInterrupted)
break;
inPrev = inCodePos;
outPrev = outCodePos;
}
}
link = link->next;
isStartBlock = False;
}
}
// ---------- WRITE ----------
RINOK_THREAD(Event_Wait(&t->canWrite));
{
BoolInt isErrorMode = False;
BoolInt canRecode = True;
BoolInt needWriteToStream = needWrite;
if (p->exitThread) return 0; // it's never executed in normal cases
if (p->wasInterrupted)
wasInterrupted = True;
else
{
if (codeRes != SZ_OK) // || !needCode // check it !!!
{
p->wasInterrupted = True;
p->codeRes = codeRes;
if (codeRes == SZ_ERROR_MEM)
isAllocError = True;
}
if (threadingErrorSRes)
{
p->wasInterrupted = True;
p->threadingErrorSRes = threadingErrorSRes;
needWriteToStream = False;
}
if (isAllocError)
{
p->wasInterrupted = True;
p->isAllocError = True;
needWriteToStream = False;
}
if (overflow)
{
p->wasInterrupted = True;
p->overflow = True;
needWriteToStream = False;
}
}
if (needCode)
{
if (wasInterrupted)
{
inCodePos = 0;
outCodePos = 0;
}
{
const UInt64 inDelta = inCodePos - inPrev;
const UInt64 outDelta = outCodePos - outPrev;
// if (inDelta != 0 || outDelta != 0)
res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);
}
}
needContinue = (!finish);
// if (res == SZ_OK && needWrite && !wasInterrupted)
if (needWrite)
{
// p->inProcessed += inCodePos;
res = p->mtCallback->Write(p->mtCallbackObject, t->index,
res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
afterEndData, afterEndData_Size,
&needContinue,
&canRecode);
// res= E_INVALIDARG; // for test
PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));
if (res != SZ_OK)
{
PRF(printf("\nWrite error = %d\n", res));
isErrorMode = True;
p->wasInterrupted = True;
}
if (res != SZ_OK
|| (!needContinue && !finish))
{
PRF(printf("\nWrite Interrupt error = %x\n", res));
MtDec_Interrupt(p, blockIndex);
}
}
if (canRecode)
if (!needCode
|| res != SZ_OK
|| p->wasInterrupted
|| codeRes != SZ_OK
|| wasInterrupted
|| p->numFilledThreads != 0
|| isErrorMode)
{
if (p->numFilledThreads == 0)
p->filledThreadStart = t->index;
if (inDataSize != 0 || !finish)
{
t->inDataSize_Start = inDataSize_Start;
t->inDataSize = inDataSize;
p->numFilledThreads++;
}
PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));
PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));
}
if (!finish)
{
RINOK_THREAD(Event_Set(&nextThread->canWrite));
}
else
{
if (needContinue)
{
// we restore decoding with new iteration
RINOK_THREAD(Event_Set(&p->threads[0].canWrite));
}
else
{
// we exit from decoding
if (t->index == 0)
return SZ_OK;
p->exitThread = True;
}
RINOK_THREAD(Event_Set(&p->threads[0].canRead));
}
}
}
}
#ifdef _WIN32
#define USE_ALLOCA
#endif
#ifdef USE_ALLOCA
#ifdef _WIN32
#include <malloc.h>
#else
#include <stdlib.h>
#endif
#endif
static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void *pp)
{
WRes res;
CMtDecThread *t = (CMtDecThread *)pp;
CMtDec *p;
// fprintf(stdout, "\n%d = %p\n", t->index, &t);
res = ThreadFunc2(t);
p = t->mtDec;
if (res == 0)
return p->exitThreadWRes;
{
// it's unexpected situation for some threading function error
if (p->exitThreadWRes == 0)
p->exitThreadWRes = res;
PRF(printf("\nthread exit error = %d\n", res));
p->exitThread = True;
Event_Set(&p->threads[0].canRead);
Event_Set(&p->threads[0].canWrite);
MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
}
return res;
}
static MY_NO_INLINE THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
{
CMtDecThread *t = (CMtDecThread *)pp;
// fprintf(stderr, "\n%d = %p - before", t->index, &t);
#ifdef USE_ALLOCA
t->allocaPtr = alloca(t->index * 128);
#endif
return ThreadFunc1(pp);
}
int MtDec_PrepareRead(CMtDec *p)
{
if (p->crossBlock && p->crossStart == p->crossEnd)
{
ISzAlloc_Free(p->alloc, p->crossBlock);
p->crossBlock = NULL;
}
{
unsigned i;
for (i = 0; i < MTDEC__THREADS_MAX; i++)
if (i > p->numStartedThreads
|| p->numFilledThreads <=
(i >= p->filledThreadStart ?
i - p->filledThreadStart :
i + p->numStartedThreads - p->filledThreadStart))
MtDecThread_FreeInBufs(&p->threads[i]);
}
return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);
}
const Byte *MtDec_Read(CMtDec *p, size_t *inLim)
{
while (p->numFilledThreads != 0)
{
CMtDecThread *t = &p->threads[p->filledThreadStart];
if (*inLim != 0)
{
{
void *link = t->inBuf;
void *next = ((CMtDecBufLink *)link)->next;
ISzAlloc_Free(p->alloc, link);
t->inBuf = next;
}
if (t->inDataSize == 0)
{
MtDecThread_FreeInBufs(t);
if (--p->numFilledThreads == 0)
break;
if (++p->filledThreadStart == p->numStartedThreads)
p->filledThreadStart = 0;
t = &p->threads[p->filledThreadStart];
}
}
{
size_t lim = t->inDataSize_Start;
if (lim != 0)
t->inDataSize_Start = 0;
else
{
UInt64 rem = t->inDataSize;
lim = p->inBufSize;
if (lim > rem)
lim = (size_t)rem;
}
t->inDataSize -= lim;
*inLim = lim;
return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);
}
}
{
size_t crossSize = p->crossEnd - p->crossStart;
if (crossSize != 0)
{
const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
*inLim = crossSize;
p->crossStart = 0;
p->crossEnd = 0;
return data;
}
*inLim = 0;
if (p->crossBlock)
{
ISzAlloc_Free(p->alloc, p->crossBlock);
p->crossBlock = NULL;
}
return NULL;
}
}
void MtDec_Construct(CMtDec *p)
{
unsigned i;
p->inBufSize = (size_t)1 << 18;
p->numThreadsMax = 0;
p->inStream = NULL;
// p->inData = NULL;
// p->inDataSize = 0;
p->crossBlock = NULL;
p->crossStart = 0;
p->crossEnd = 0;
p->numFilledThreads = 0;
p->progress = NULL;
p->alloc = NULL;
p->mtCallback = NULL;
p->mtCallbackObject = NULL;
p->allocatedBufsSize = 0;
for (i = 0; i < MTDEC__THREADS_MAX; i++)
{
CMtDecThread *t = &p->threads[i];
t->mtDec = p;
t->index = i;
t->inBuf = NULL;
Event_Construct(&t->canRead);
Event_Construct(&t->canWrite);
Thread_Construct(&t->thread);
}
// Event_Construct(&p->finishedEvent);
CriticalSection_Init(&p->mtProgress.cs);
}
static void MtDec_Free(CMtDec *p)
{
unsigned i;
p->exitThread = True;
for (i = 0; i < MTDEC__THREADS_MAX; i++)
MtDecThread_Destruct(&p->threads[i]);
// Event_Close(&p->finishedEvent);
if (p->crossBlock)
{
ISzAlloc_Free(p->alloc, p->crossBlock);
p->crossBlock = NULL;
}
}
void MtDec_Destruct(CMtDec *p)
{
MtDec_Free(p);
CriticalSection_Delete(&p->mtProgress.cs);
}
SRes MtDec_Code(CMtDec *p)
{
unsigned i;
p->inProcessed = 0;
p->blockIndex = 1; // it must be larger than not_defined index (0)
p->isAllocError = False;
p->overflow = False;
p->threadingErrorSRes = SZ_OK;
p->needContinue = True;
p->readWasFinished = False;
p->needInterrupt = False;
p->interruptIndex = (UInt64)(Int64)-1;
p->readProcessed = 0;
p->readRes = SZ_OK;
p->codeRes = SZ_OK;
p->wasInterrupted = False;
p->crossStart = 0;
p->crossEnd = 0;
p->filledThreadStart = 0;
p->numFilledThreads = 0;
{
unsigned numThreads = p->numThreadsMax;
if (numThreads > MTDEC__THREADS_MAX)
numThreads = MTDEC__THREADS_MAX;
p->numStartedThreads_Limit = numThreads;
p->numStartedThreads = 0;
}
if (p->inBufSize != p->allocatedBufsSize)
{
for (i = 0; i < MTDEC__THREADS_MAX; i++)
{
CMtDecThread *t = &p->threads[i];
if (t->inBuf)
MtDecThread_FreeInBufs(t);
}
if (p->crossBlock)
{
ISzAlloc_Free(p->alloc, p->crossBlock);
p->crossBlock = NULL;
}
p->allocatedBufsSize = p->inBufSize;
}
MtProgress_Init(&p->mtProgress, p->progress);
// RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
p->exitThread = False;
p->exitThreadWRes = 0;
{
WRes wres;
WRes sres;
CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
// wres = MtDecThread_CreateAndStart(nextThread);
wres = MtDecThread_CreateEvents(nextThread);
if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
if (wres == 0) { wres = Event_Set(&nextThread->canRead);
if (wres == 0) { wres = ThreadFunc(nextThread);
if (wres != 0)
{
p->needContinue = False;
MtDec_CloseThreads(p);
}}}}
// wres = 17; // for test
// wres = Event_Wait(&p->finishedEvent);
sres = MY_SRes_HRESULT_FROM_WRes(wres);
if (sres != 0)
p->threadingErrorSRes = sres;
if (
// wres == 0
// wres != 0
// || p->mtc.codeRes == SZ_ERROR_MEM
p->isAllocError
|| p->threadingErrorSRes != SZ_OK
|| p->overflow)
{
// p->needContinue = True;
}
else
p->needContinue = False;
if (p->needContinue)
return SZ_OK;
// if (sres != SZ_OK)
return sres;
// return E_FAIL;
}
}
#endif