C++程序  |  1138行  |  29.13 KB

/* MtDec.c -- Multi-thread Decoder
2018-07-04 : Igor Pavlov : Public domain */

#include "Precomp.h"

// #define SHOW_DEBUG_INFO

// #include <stdio.h>

#ifdef SHOW_DEBUG_INFO
#include <stdio.h>
#endif

#ifdef SHOW_DEBUG_INFO
#define PRF(x) x
#else
#define PRF(x)
#endif

#define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))

#include "MtDec.h"

#ifndef _7ZIP_ST

void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
{
  p->progress = progress;
  p->res = SZ_OK;
  p->totalInSize = 0;
  p->totalOutSize = 0;
}


SRes MtProgress_Progress_ST(CMtProgress *p)
{
  if (p->res == SZ_OK && p->progress)
    if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
      p->res = SZ_ERROR_PROGRESS;
  return p->res;
}


SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)
{
  SRes res;
  CriticalSection_Enter(&p->cs);
  
  p->totalInSize += inSize;
  p->totalOutSize += outSize;
  if (p->res == SZ_OK && p->progress)
    if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
      p->res = SZ_ERROR_PROGRESS;
  res = p->res;
  
  CriticalSection_Leave(&p->cs);
  return res;
}


SRes MtProgress_GetError(CMtProgress *p)
{
  SRes res;
  CriticalSection_Enter(&p->cs);
  res = p->res;
  CriticalSection_Leave(&p->cs);
  return res;
}


void MtProgress_SetError(CMtProgress *p, SRes res)
{
  CriticalSection_Enter(&p->cs);
  if (p->res == SZ_OK)
    p->res = res;
  CriticalSection_Leave(&p->cs);
}


#define RINOK_THREAD(x) RINOK(x)


static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
{
  if (Event_IsCreated(p))
    return Event_Reset(p);
  return AutoResetEvent_CreateNotSignaled(p);
}



typedef struct
{
  void *next;
  void *pad[3];
} CMtDecBufLink;

#define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
#define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)



static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);


static WRes MtDecThread_CreateEvents(CMtDecThread *t)
{
  WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite);
  if (wres == 0)
  {
    wres = ArEvent_OptCreate_And_Reset(&t->canRead);
    if (wres == 0)
      return SZ_OK;
  }
  return wres;
}


static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
{
  WRes wres = MtDecThread_CreateEvents(t);
  // wres = 17; // for test
  if (wres == 0)
  {
    if (Thread_WasCreated(&t->thread))
      return SZ_OK;
    wres = Thread_Create(&t->thread, ThreadFunc, t);
    if (wres == 0)
      return SZ_OK;
  }
  return MY_SRes_HRESULT_FROM_WRes(wres);
}


void MtDecThread_FreeInBufs(CMtDecThread *t)
{
  if (t->inBuf)
  {
    void *link = t->inBuf;
    t->inBuf = NULL;
    do
    {
      void *next = ((CMtDecBufLink *)link)->next;
      ISzAlloc_Free(t->mtDec->alloc, link);
      link = next;
    }
    while (link);
  }
}


static void MtDecThread_CloseThread(CMtDecThread *t)
{
  if (Thread_WasCreated(&t->thread))
  {
    Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
    Event_Set(&t->canRead);
    Thread_Wait(&t->thread);
    Thread_Close(&t->thread);
  }

  Event_Close(&t->canRead);
  Event_Close(&t->canWrite);
}

static void MtDec_CloseThreads(CMtDec *p)
{
  unsigned i;
  for (i = 0; i < MTDEC__THREADS_MAX; i++)
    MtDecThread_CloseThread(&p->threads[i]);
}

static void MtDecThread_Destruct(CMtDecThread *t)
{
  MtDecThread_CloseThread(t);
  MtDecThread_FreeInBufs(t);
}



static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
{
  size_t size = *processedSize;
  *processedSize = 0;
  while (size != 0)
  {
    size_t cur = size;
    SRes res = ISeqInStream_Read(stream, data, &cur);
    *processedSize += cur;
    data += cur;
    size -= cur;
    RINOK(res);
    if (cur == 0)
      return SZ_OK;
  }
  return SZ_OK;
}


static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted)
{
  SRes res;
  CriticalSection_Enter(&p->mtProgress.cs);
  *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
  res = p->mtProgress.res;
  CriticalSection_Leave(&p->mtProgress.cs);
  return res;
}

static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted)
{
  SRes res;
  CriticalSection_Enter(&p->mtProgress.cs);

  p->mtProgress.totalInSize += inSize;
  p->mtProgress.totalOutSize += outSize;
  if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)
    if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)
      p->mtProgress.res = SZ_ERROR_PROGRESS;

  *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
  res = p->mtProgress.res;
  
  CriticalSection_Leave(&p->mtProgress.cs);

  return res;
}

static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)
{
  CriticalSection_Enter(&p->mtProgress.cs);
  if (!p->needInterrupt || interruptIndex < p->interruptIndex)
  {
    p->interruptIndex = interruptIndex;
    p->needInterrupt = True;
  }
  CriticalSection_Leave(&p->mtProgress.cs);
}

Byte *MtDec_GetCrossBuff(CMtDec *p)
{
  Byte *cr = p->crossBlock;
  if (!cr)
  {
    cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
    if (!cr)
      return NULL;
    p->crossBlock = cr;
  }
  return MTDEC__DATA_PTR_FROM_LINK(cr);
}


/*
  ThreadFunc2() returns:
  0      - in all normal cases (even for stream error or memory allocation error)
  (!= 0) - WRes error return by system threading function
*/

// #define MTDEC_ProgessStep (1 << 22)
#define MTDEC_ProgessStep (1 << 0)

static WRes ThreadFunc2(CMtDecThread *t)
{
  CMtDec *p = t->mtDec;

  PRF_STR_INT("ThreadFunc2", t->index);

  // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);

  for (;;)
  {
    SRes res, codeRes;
    BoolInt wasInterrupted, isAllocError, overflow, finish;
    SRes threadingErrorSRes;
    BoolInt needCode, needWrite, needContinue;
    
    size_t inDataSize_Start;
    UInt64 inDataSize;
    // UInt64 inDataSize_Full;
    
    UInt64 blockIndex;

    UInt64 inPrev = 0;
    UInt64 outPrev = 0;
    UInt64 inCodePos;
    UInt64 outCodePos;
    
    Byte *afterEndData = NULL;
    size_t afterEndData_Size = 0;

    BoolInt canCreateNewThread = False;
    // CMtDecCallbackInfo parse;
    CMtDecThread *nextThread;

    PRF_STR_INT("Event_Wait(&t->canRead)", t->index);

    RINOK_THREAD(Event_Wait(&t->canRead));
    if (p->exitThread)
      return 0;

    PRF_STR_INT("after Event_Wait(&t->canRead)", t->index);

    // if (t->index == 3) return 19; // for test

    blockIndex = p->blockIndex++;

    // PRF(printf("\ncanRead\n"))

    res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);

    finish = p->readWasFinished;
    needCode = False;
    needWrite = False;
    isAllocError = False;
    overflow = False;

    inDataSize_Start = 0;
    inDataSize = 0;
    // inDataSize_Full = 0;

    if (res == SZ_OK && !wasInterrupted)
    {
      // if (p->inStream)
      {
        CMtDecBufLink *prev = NULL;
        CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
        size_t crossSize = p->crossEnd - p->crossStart;

        PRF(printf("\ncrossSize = %d\n", crossSize));

        for (;;)
        {
          if (!link)
          {
            link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
            if (!link)
            {
              finish = True;
              // p->allocError_for_Read_BlockIndex = blockIndex;
              isAllocError = True;
              break;
            }
            link->next = NULL;
            if (prev)
            {
              // static unsigned g_num = 0;
              // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
              prev->next = link;
            }
            else
              t->inBuf = (void *)link;
          }

          {
            Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);
            Byte *parseData = data;
            size_t size;

            if (crossSize != 0)
            {
              inDataSize = crossSize;
              // inDataSize_Full = inDataSize;
              inDataSize_Start = crossSize;
              size = crossSize;
              parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
              PRF(printf("\ncross : crossStart = %7d  crossEnd = %7d finish = %1d",
                  (int)p->crossStart, (int)p->crossEnd, (int)finish));
            }
            else
            {
              size = p->inBufSize;
              
              res = FullRead(p->inStream, data, &size);
              
              // size = 10; // test

              inDataSize += size;
              // inDataSize_Full = inDataSize;
              if (!prev)
                inDataSize_Start = size;

              p->readProcessed += size;
              finish = (size != p->inBufSize);
              if (finish)
                p->readWasFinished = True;
              
              // res = E_INVALIDARG; // test

              if (res != SZ_OK)
              {
                // PRF(printf("\nRead error = %d\n", res))
                // we want to decode all data before error
                p->readRes = res;
                // p->readError_BlockIndex = blockIndex;
                p->readWasFinished = True;
                finish = True;
                res = SZ_OK;
                // break;
              }

              if (inDataSize - inPrev >= MTDEC_ProgessStep)
              {
                res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
                if (res != SZ_OK || wasInterrupted)
                  break;
                inPrev = inDataSize;
              }
            }

            {
              CMtDecCallbackInfo parse;

              parse.startCall = (prev == NULL);
              parse.src = parseData;
              parse.srcSize = size;
              parse.srcFinished = finish;
              parse.canCreateNewThread = True;

              // PRF(printf("\nParse size = %d\n", (unsigned)size))

              p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);

              needWrite = True;
              canCreateNewThread = parse.canCreateNewThread;

              // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
              
              if (
                  // parseRes != SZ_OK ||
                  // inDataSize - (size - parse.srcSize) > p->inBlockMax
                  // ||
                  parse.state == MTDEC_PARSE_OVERFLOW
                  // || wasInterrupted
                  )
              {
                // Overflow or Parse error - switch from MT decoding to ST decoding
                finish = True;
                overflow = True;

                {
                  PRF(printf("\n Overflow"));
                  // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
                  PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));
                }
                
                if (crossSize != 0)
                  memcpy(data, parseData, size);
                p->crossStart = 0;
                p->crossEnd = 0;
                break;
              }

              if (crossSize != 0)
              {
                memcpy(data, parseData, parse.srcSize);
                p->crossStart += parse.srcSize;
              }

              if (parse.state != MTDEC_PARSE_CONTINUE || finish)
              {
                // we don't need to parse in current thread anymore

                if (parse.state == MTDEC_PARSE_END)
                  finish = True;

                needCode = True;
                // p->crossFinished = finish;

                if (parse.srcSize == size)
                {
                  // full parsed - no cross transfer
                  p->crossStart = 0;
                  p->crossEnd = 0;
                  break;
                }

                if (parse.state == MTDEC_PARSE_END)
                {
                  p->crossStart = 0;
                  p->crossEnd = 0;

                  if (crossSize != 0)
                    memcpy(data + parse.srcSize, parseData + parse.srcSize, size - parse.srcSize); // we need all data
                  afterEndData_Size = size - parse.srcSize;
                  afterEndData = parseData + parse.srcSize;

                  // we reduce data size to required bytes (parsed only)
                  inDataSize -= (size - parse.srcSize);
                  if (!prev)
                    inDataSize_Start = parse.srcSize;
                  break;
                }

                {
                  // partial parsed - need cross transfer
                  if (crossSize != 0)
                    inDataSize = parse.srcSize; // it's only parsed now
                  else
                  {
                    // partial parsed - is not in initial cross block - we need to copy new data to cross block
                    Byte *cr = MtDec_GetCrossBuff(p);
                    if (!cr)
                    {
                      {
                        PRF(printf("\ncross alloc error error\n"));
                        // res = SZ_ERROR_MEM;
                        finish = True;
                        // p->allocError_for_Read_BlockIndex = blockIndex;
                        isAllocError = True;
                        break;
                      }
                    }

                    {
                      size_t crSize = size - parse.srcSize;
                      inDataSize -= crSize;
                      p->crossEnd = crSize;
                      p->crossStart = 0;
                      memcpy(cr, parseData + parse.srcSize, crSize);
                    }
                  }

                  // inDataSize_Full = inDataSize;
                  if (!prev)
                    inDataSize_Start = parse.srcSize; // it's partial size (parsed only)

                  finish = False;
                  break;
                }
              }

              if (parse.srcSize != size)
              {
                res = SZ_ERROR_FAIL;
                PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));
                break;
              }
            }
          }
          
          prev = link;
          link = link->next;

          if (crossSize != 0)
          {
            crossSize = 0;
            p->crossStart = 0;
            p->crossEnd = 0;
          }
        }
      }

      if (res == SZ_OK)
        res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);
    }

    codeRes = SZ_OK;

    if (res == SZ_OK && needCode && !wasInterrupted)
    {
      codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);
      if (codeRes != SZ_OK)
      {
        needCode = False;
        finish = True;
        // SZ_ERROR_MEM is expected error here.
        //   if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
        //   if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
      }
    }
    
    if (res != SZ_OK || wasInterrupted)
      finish = True;
    
    nextThread = NULL;
    threadingErrorSRes = SZ_OK;

    if (!finish)
    {
      if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)
      {
        SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);
        if (res2 == SZ_OK)
        {
          // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
          p->numStartedThreads++;
        }
        else
        {
          PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));
          if (p->numStartedThreads == 1)
          {
            // if only one thread is possible, we leave muti-threading code
            finish = True;
            needCode = False;
            threadingErrorSRes = res2;
          }
          else
            p->numStartedThreads_Limit = p->numStartedThreads;
        }
      }
      
      if (!finish)
      {
        unsigned nextIndex = t->index + 1;
        nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];
        RINOK_THREAD(Event_Set(&nextThread->canRead))
        // We have started executing for new iteration (with next thread)
        // And that next thread now is responsible for possible exit from decoding (threading_code)
      }
    }

    // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
    // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
    // if (  finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
    //   - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
    //   - otherwise we stop decoding and exit from ThreadFunc2()

    // Don't change (finish) variable in the further code


    // ---------- CODE ----------

    inPrev = 0;
    outPrev = 0;
    inCodePos = 0;
    outCodePos = 0;

    if (res == SZ_OK && needCode && codeRes == SZ_OK)
    {
      BoolInt isStartBlock = True;
      CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;

      for (;;)
      {
        size_t inSize;
        int stop;

        if (isStartBlock)
          inSize = inDataSize_Start;
        else
        {
          UInt64 rem = inDataSize - inCodePos;
          inSize = p->inBufSize;
          if (inSize > rem)
            inSize = (size_t)rem;
        }

        inCodePos += inSize;
        stop = True;

        codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,
            (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,
            (inCodePos == inDataSize), // srcFinished
            &inCodePos, &outCodePos, &stop);
        
        if (codeRes != SZ_OK)
        {
          PRF(printf("\nCode Interrupt error = %x\n", codeRes));
          // we interrupt only later blocks
          MtDec_Interrupt(p, blockIndex);
          break;
        }

        if (stop || inCodePos == inDataSize)
          break;
  
        {
          const UInt64 inDelta = inCodePos - inPrev;
          const UInt64 outDelta = outCodePos - outPrev;
          if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)
          {
            // Sleep(1);
            res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);
            if (res != SZ_OK || wasInterrupted)
              break;
            inPrev = inCodePos;
            outPrev = outCodePos;
          }
        }

        link = link->next;
        isStartBlock = False;
      }
    }


    // ---------- WRITE ----------
   
    RINOK_THREAD(Event_Wait(&t->canWrite));

  {
    BoolInt isErrorMode = False;
    BoolInt canRecode = True;
    BoolInt needWriteToStream = needWrite;

    if (p->exitThread) return 0; // it's never executed in normal cases

    if (p->wasInterrupted)
      wasInterrupted = True;
    else
    {
      if (codeRes != SZ_OK) // || !needCode // check it !!!
      {
        p->wasInterrupted = True;
        p->codeRes = codeRes;
        if (codeRes == SZ_ERROR_MEM)
          isAllocError = True;
      }
      
      if (threadingErrorSRes)
      {
        p->wasInterrupted = True;
        p->threadingErrorSRes = threadingErrorSRes;
        needWriteToStream = False;
      }
      if (isAllocError)
      {
        p->wasInterrupted = True;
        p->isAllocError = True;
        needWriteToStream = False;
      }
      if (overflow)
      {
        p->wasInterrupted = True;
        p->overflow = True;
        needWriteToStream = False;
      }
    }

    if (needCode)
    {
      if (wasInterrupted)
      {
        inCodePos = 0;
        outCodePos = 0;
      }
      {
        const UInt64 inDelta = inCodePos - inPrev;
        const UInt64 outDelta = outCodePos - outPrev;
        // if (inDelta != 0 || outDelta != 0)
        res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);
      }
    }

    needContinue = (!finish);

    // if (res == SZ_OK && needWrite && !wasInterrupted)
    if (needWrite)
    {
      // p->inProcessed += inCodePos;

      res = p->mtCallback->Write(p->mtCallbackObject, t->index,
          res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
          afterEndData, afterEndData_Size,
          &needContinue,
          &canRecode);
      
      // res= E_INVALIDARG; // for test

      PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
      PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));

      if (res != SZ_OK)
      {
        PRF(printf("\nWrite error = %d\n", res));
        isErrorMode = True;
        p->wasInterrupted = True;
      }
      if (res != SZ_OK
          || (!needContinue && !finish))
      {
        PRF(printf("\nWrite Interrupt error = %x\n", res));
        MtDec_Interrupt(p, blockIndex);
      }
    }

    if (canRecode)
    if (!needCode
        || res != SZ_OK
        || p->wasInterrupted
        || codeRes != SZ_OK
        || wasInterrupted
        || p->numFilledThreads != 0
        || isErrorMode)
    {
      if (p->numFilledThreads == 0)
        p->filledThreadStart = t->index;
      if (inDataSize != 0 || !finish)
      {
        t->inDataSize_Start = inDataSize_Start;
        t->inDataSize = inDataSize;
        p->numFilledThreads++;
      }
      PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));
      PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));
    }

    if (!finish)
    {
      RINOK_THREAD(Event_Set(&nextThread->canWrite));
    }
    else
    {
      if (needContinue)
      {
        // we restore decoding with new iteration
        RINOK_THREAD(Event_Set(&p->threads[0].canWrite));
      }
      else
      {
        // we exit from decoding
        if (t->index == 0)
          return SZ_OK;
        p->exitThread = True;
      }
      RINOK_THREAD(Event_Set(&p->threads[0].canRead));
    }
  }
  }
}

#ifdef _WIN32
#define USE_ALLOCA
#endif

#ifdef USE_ALLOCA
#ifdef _WIN32
#include <malloc.h>
#else
#include <stdlib.h>
#endif
#endif


static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void *pp)
{
  WRes res;

  CMtDecThread *t = (CMtDecThread *)pp;
  CMtDec *p;

  // fprintf(stdout, "\n%d = %p\n", t->index, &t);

  res = ThreadFunc2(t);
  p = t->mtDec;
  if (res == 0)
    return p->exitThreadWRes;
  {
    // it's unexpected situation for some threading function error
    if (p->exitThreadWRes == 0)
      p->exitThreadWRes = res;
    PRF(printf("\nthread exit error = %d\n", res));
    p->exitThread = True;
    Event_Set(&p->threads[0].canRead);
    Event_Set(&p->threads[0].canWrite);
    MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
  }
  return res;
}

static MY_NO_INLINE THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
{
  CMtDecThread *t = (CMtDecThread *)pp;

  // fprintf(stderr, "\n%d = %p - before", t->index, &t);
  #ifdef USE_ALLOCA
  t->allocaPtr = alloca(t->index * 128);
  #endif
  return ThreadFunc1(pp);
}


int MtDec_PrepareRead(CMtDec *p)
{
  if (p->crossBlock && p->crossStart == p->crossEnd)
  {
    ISzAlloc_Free(p->alloc, p->crossBlock);
    p->crossBlock = NULL;
  }
    
  {
    unsigned i;
    for (i = 0; i < MTDEC__THREADS_MAX; i++)
      if (i > p->numStartedThreads
          || p->numFilledThreads <=
            (i >= p->filledThreadStart ?
              i - p->filledThreadStart :
              i + p->numStartedThreads - p->filledThreadStart))
        MtDecThread_FreeInBufs(&p->threads[i]);
  }

  return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);
}

    
const Byte *MtDec_Read(CMtDec *p, size_t *inLim)
{
  while (p->numFilledThreads != 0)
  {
    CMtDecThread *t = &p->threads[p->filledThreadStart];
    
    if (*inLim != 0)
    {
      {
        void *link = t->inBuf;
        void *next = ((CMtDecBufLink *)link)->next;
        ISzAlloc_Free(p->alloc, link);
        t->inBuf = next;
      }
      
      if (t->inDataSize == 0)
      {
        MtDecThread_FreeInBufs(t);
        if (--p->numFilledThreads == 0)
          break;
        if (++p->filledThreadStart == p->numStartedThreads)
          p->filledThreadStart = 0;
        t = &p->threads[p->filledThreadStart];
      }
    }
    
    {
      size_t lim = t->inDataSize_Start;
      if (lim != 0)
        t->inDataSize_Start = 0;
      else
      {
        UInt64 rem = t->inDataSize;
        lim = p->inBufSize;
        if (lim > rem)
          lim = (size_t)rem;
      }
      t->inDataSize -= lim;
      *inLim = lim;
      return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);
    }
  }

  {
    size_t crossSize = p->crossEnd - p->crossStart;
    if (crossSize != 0)
    {
      const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
      *inLim = crossSize;
      p->crossStart = 0;
      p->crossEnd = 0;
      return data;
    }
    *inLim = 0;
    if (p->crossBlock)
    {
      ISzAlloc_Free(p->alloc, p->crossBlock);
      p->crossBlock = NULL;
    }
    return NULL;
  }
}


void MtDec_Construct(CMtDec *p)
{
  unsigned i;
  
  p->inBufSize = (size_t)1 << 18;

  p->numThreadsMax = 0;

  p->inStream = NULL;
  
  // p->inData = NULL;
  // p->inDataSize = 0;

  p->crossBlock = NULL;
  p->crossStart = 0;
  p->crossEnd = 0;

  p->numFilledThreads = 0;

  p->progress = NULL;
  p->alloc = NULL;

  p->mtCallback = NULL;
  p->mtCallbackObject = NULL;

  p->allocatedBufsSize = 0;

  for (i = 0; i < MTDEC__THREADS_MAX; i++)
  {
    CMtDecThread *t = &p->threads[i];
    t->mtDec = p;
    t->index = i;
    t->inBuf = NULL;
    Event_Construct(&t->canRead);
    Event_Construct(&t->canWrite);
    Thread_Construct(&t->thread);
  }

  // Event_Construct(&p->finishedEvent);

  CriticalSection_Init(&p->mtProgress.cs);
}


static void MtDec_Free(CMtDec *p)
{
  unsigned i;

  p->exitThread = True;

  for (i = 0; i < MTDEC__THREADS_MAX; i++)
    MtDecThread_Destruct(&p->threads[i]);

  // Event_Close(&p->finishedEvent);

  if (p->crossBlock)
  {
    ISzAlloc_Free(p->alloc, p->crossBlock);
    p->crossBlock = NULL;
  }
}


void MtDec_Destruct(CMtDec *p)
{
  MtDec_Free(p);

  CriticalSection_Delete(&p->mtProgress.cs);
}


SRes MtDec_Code(CMtDec *p)
{
  unsigned i;

  p->inProcessed = 0;

  p->blockIndex = 1; // it must be larger than not_defined index (0)
  p->isAllocError = False;
  p->overflow = False;
  p->threadingErrorSRes = SZ_OK;

  p->needContinue = True;

  p->readWasFinished = False;
  p->needInterrupt = False;
  p->interruptIndex = (UInt64)(Int64)-1;

  p->readProcessed = 0;
  p->readRes = SZ_OK;
  p->codeRes = SZ_OK;
  p->wasInterrupted = False;

  p->crossStart = 0;
  p->crossEnd = 0;

  p->filledThreadStart = 0;
  p->numFilledThreads = 0;

  {
    unsigned numThreads = p->numThreadsMax;
    if (numThreads > MTDEC__THREADS_MAX)
      numThreads = MTDEC__THREADS_MAX;
    p->numStartedThreads_Limit = numThreads;
    p->numStartedThreads = 0;
  }

  if (p->inBufSize != p->allocatedBufsSize)
  {
    for (i = 0; i < MTDEC__THREADS_MAX; i++)
    {
      CMtDecThread *t = &p->threads[i];
      if (t->inBuf)
        MtDecThread_FreeInBufs(t);
    }
    if (p->crossBlock)
    {
      ISzAlloc_Free(p->alloc, p->crossBlock);
      p->crossBlock = NULL;
    }

    p->allocatedBufsSize = p->inBufSize;
  }

  MtProgress_Init(&p->mtProgress, p->progress);

  // RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
  p->exitThread = False;
  p->exitThreadWRes = 0;

  {
    WRes wres;
    WRes sres;
    CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
    // wres = MtDecThread_CreateAndStart(nextThread);
    wres = MtDecThread_CreateEvents(nextThread);
    if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
    if (wres == 0) { wres = Event_Set(&nextThread->canRead);
    if (wres == 0) { wres = ThreadFunc(nextThread);
    if (wres != 0)
    {
      p->needContinue = False;
      MtDec_CloseThreads(p);
    }}}}

    // wres = 17; // for test
    // wres = Event_Wait(&p->finishedEvent);

    sres = MY_SRes_HRESULT_FROM_WRes(wres);

    if (sres != 0)
      p->threadingErrorSRes = sres;

    if (
        // wres == 0
        // wres != 0
        // || p->mtc.codeRes == SZ_ERROR_MEM
        p->isAllocError
        || p->threadingErrorSRes != SZ_OK
        || p->overflow)
    {
      // p->needContinue = True;
    }
    else
      p->needContinue = False;
    
    if (p->needContinue)
      return SZ_OK;

    // if (sres != SZ_OK)
      return sres;
    // return E_FAIL;
  }
}

#endif