C++程序  |  602行  |  13.93 KB

/* MtCoder.c -- Multi-thread Coder
2018-07-04 : Igor Pavlov : Public domain */

#include "Precomp.h"

#include "MtCoder.h"

#ifndef _7ZIP_ST

SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize)
{
  CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt);
  UInt64 inSize2 = 0;
  UInt64 outSize2 = 0;
  if (inSize != (UInt64)(Int64)-1)
  {
    inSize2 = inSize - thunk->inSize;
    thunk->inSize = inSize;
  }
  if (outSize != (UInt64)(Int64)-1)
  {
    outSize2 = outSize - thunk->outSize;
    thunk->outSize = outSize;
  }
  return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2);
}


void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
{
  p->vt.Progress = MtProgressThunk_Progress;
}



#define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }


static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
{
  if (Event_IsCreated(p))
    return Event_Reset(p);
  return AutoResetEvent_CreateNotSignaled(p);
}


static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);


static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
{
  WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent);
  if (wres == 0)
  {
    t->stop = False;
    if (!Thread_WasCreated(&t->thread))
      wres = Thread_Create(&t->thread, ThreadFunc, t);
    if (wres == 0)
      wres = Event_Set(&t->startEvent);
  }
  if (wres == 0)
    return SZ_OK;
  return MY_SRes_HRESULT_FROM_WRes(wres);
}


static void MtCoderThread_Destruct(CMtCoderThread *t)
{
  if (Thread_WasCreated(&t->thread))
  {
    t->stop = 1;
    Event_Set(&t->startEvent);
    Thread_Wait(&t->thread);
    Thread_Close(&t->thread);
  }

  Event_Close(&t->startEvent);

  if (t->inBuf)
  {
    ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);
    t->inBuf = NULL;
  }
}



static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
{
  size_t size = *processedSize;
  *processedSize = 0;
  while (size != 0)
  {
    size_t cur = size;
    SRes res = ISeqInStream_Read(stream, data, &cur);
    *processedSize += cur;
    data += cur;
    size -= cur;
    RINOK(res);
    if (cur == 0)
      return SZ_OK;
  }
  return SZ_OK;
}


/*
  ThreadFunc2() returns:
  SZ_OK           - in all normal cases (even for stream error or memory allocation error)
  SZ_ERROR_THREAD - in case of failure in system synch function
*/

static SRes ThreadFunc2(CMtCoderThread *t)
{
  CMtCoder *mtc = t->mtCoder;

  for (;;)
  {
    unsigned bi;
    SRes res;
    SRes res2;
    BoolInt finished;
    unsigned bufIndex;
    size_t size;
    const Byte *inData;
    UInt64 readProcessed = 0;
    
    RINOK_THREAD(Event_Wait(&mtc->readEvent))

    /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */

    if (mtc->stopReading)
    {
      return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;
    }

    res = MtProgress_GetError(&mtc->mtProgress);
    
    size = 0;
    inData = NULL;
    finished = True;

    if (res == SZ_OK)
    {
      size = mtc->blockSize;
      if (mtc->inStream)
      {
        if (!t->inBuf)
        {
          t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);
          if (!t->inBuf)
            res = SZ_ERROR_MEM;
        }
        if (res == SZ_OK)
        {
          res = FullRead(mtc->inStream, t->inBuf, &size);
          readProcessed = mtc->readProcessed + size;
          mtc->readProcessed = readProcessed;
        }
        if (res != SZ_OK)
        {
          mtc->readRes = res;
          /* after reading error - we can stop encoding of previous blocks */
          MtProgress_SetError(&mtc->mtProgress, res);
        }
        else
          finished = (size != mtc->blockSize);
      }
      else
      {
        size_t rem;
        readProcessed = mtc->readProcessed;
        rem = mtc->inDataSize - (size_t)readProcessed;
        if (size > rem)
          size = rem;
        inData = mtc->inData + (size_t)readProcessed;
        readProcessed += size;
        mtc->readProcessed = readProcessed;
        finished = (mtc->inDataSize == (size_t)readProcessed);
      }
    }

    /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */

    res2 = SZ_OK;

    if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)
    {
      res2 = SZ_ERROR_THREAD;
      if (res == SZ_OK)
      {
        res = res2;
        // MtProgress_SetError(&mtc->mtProgress, res);
      }
    }

    bi = mtc->blockIndex;

    if (++mtc->blockIndex >= mtc->numBlocksMax)
      mtc->blockIndex = 0;

    bufIndex = (unsigned)(int)-1;

    if (res == SZ_OK)
      res = MtProgress_GetError(&mtc->mtProgress);

    if (res != SZ_OK)
      finished = True;

    if (!finished)
    {
      if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
          && mtc->expectedDataSize != readProcessed)
      {
        res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);
        if (res == SZ_OK)
          mtc->numStartedThreads++;
        else
        {
          MtProgress_SetError(&mtc->mtProgress, res);
          finished = True;
        }
      }
    }

    if (finished)
      mtc->stopReading = True;

    RINOK_THREAD(Event_Set(&mtc->readEvent))

    if (res2 != SZ_OK)
      return res2;

    if (res == SZ_OK)
    {
      CriticalSection_Enter(&mtc->cs);
      bufIndex = mtc->freeBlockHead;
      mtc->freeBlockHead = mtc->freeBlockList[bufIndex];
      CriticalSection_Leave(&mtc->cs);
      
      res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,
          mtc->inStream ? t->inBuf : inData, size, finished);
      
      // MtProgress_Reinit(&mtc->mtProgress, t->index);

      if (res != SZ_OK)
        MtProgress_SetError(&mtc->mtProgress, res);
    }

    {
      CMtCoderBlock *block = &mtc->blocks[bi];
      block->res = res;
      block->bufIndex = bufIndex;
      block->finished = finished;
    }
    
    #ifdef MTCODER__USE_WRITE_THREAD
      RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
    #else
    {
      unsigned wi;
      {
        CriticalSection_Enter(&mtc->cs);
        wi = mtc->writeIndex;
        if (wi == bi)
          mtc->writeIndex = (unsigned)(int)-1;
        else
          mtc->ReadyBlocks[bi] = True;
        CriticalSection_Leave(&mtc->cs);
      }

      if (wi != bi)
      {
        if (res != SZ_OK || finished)
          return 0;
        continue;
      }

      if (mtc->writeRes != SZ_OK)
        res = mtc->writeRes;

      for (;;)
      {
        if (res == SZ_OK && bufIndex != (unsigned)(int)-1)
        {
          res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);
          if (res != SZ_OK)
          {
            mtc->writeRes = res;
            MtProgress_SetError(&mtc->mtProgress, res);
          }
        }

        if (++wi >= mtc->numBlocksMax)
          wi = 0;
        {
          BoolInt isReady;

          CriticalSection_Enter(&mtc->cs);
          
          if (bufIndex != (unsigned)(int)-1)
          {
            mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;
            mtc->freeBlockHead = bufIndex;
          }
          
          isReady = mtc->ReadyBlocks[wi];
          
          if (isReady)
            mtc->ReadyBlocks[wi] = False;
          else
            mtc->writeIndex = wi;
          
          CriticalSection_Leave(&mtc->cs);

          RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))

          if (!isReady)
            break;
        }

        {
          CMtCoderBlock *block = &mtc->blocks[wi];
          if (res == SZ_OK && block->res != SZ_OK)
            res = block->res;
          bufIndex = block->bufIndex;
          finished = block->finished;
        }
      }
    }
    #endif
      
    if (finished || res != SZ_OK)
      return 0;
  }
}


static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
{
  CMtCoderThread *t = (CMtCoderThread *)pp;
  for (;;)
  {
    if (Event_Wait(&t->startEvent) != 0)
      return SZ_ERROR_THREAD;
    if (t->stop)
      return 0;
    {
      SRes res = ThreadFunc2(t);
      CMtCoder *mtc = t->mtCoder;
      if (res != SZ_OK)
      {
        MtProgress_SetError(&mtc->mtProgress, res);
      }
      
      #ifndef MTCODER__USE_WRITE_THREAD
      {
        unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
        if (numFinished == mtc->numStartedThreads)
          if (Event_Set(&mtc->finishedEvent) != 0)
            return SZ_ERROR_THREAD;
      }
      #endif
    }
  }
}



void MtCoder_Construct(CMtCoder *p)
{
  unsigned i;
  
  p->blockSize = 0;
  p->numThreadsMax = 0;
  p->expectedDataSize = (UInt64)(Int64)-1;

  p->inStream = NULL;
  p->inData = NULL;
  p->inDataSize = 0;

  p->progress = NULL;
  p->allocBig = NULL;

  p->mtCallback = NULL;
  p->mtCallbackObject = NULL;

  p->allocatedBufsSize = 0;

  Event_Construct(&p->readEvent);
  Semaphore_Construct(&p->blocksSemaphore);

  for (i = 0; i < MTCODER__THREADS_MAX; i++)
  {
    CMtCoderThread *t = &p->threads[i];
    t->mtCoder = p;
    t->index = i;
    t->inBuf = NULL;
    t->stop = False;
    Event_Construct(&t->startEvent);
    Thread_Construct(&t->thread);
  }

  #ifdef MTCODER__USE_WRITE_THREAD
    for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
      Event_Construct(&p->writeEvents[i]);
  #else
    Event_Construct(&p->finishedEvent);
  #endif

  CriticalSection_Init(&p->cs);
  CriticalSection_Init(&p->mtProgress.cs);
}




static void MtCoder_Free(CMtCoder *p)
{
  unsigned i;

  /*
  p->stopReading = True;
  if (Event_IsCreated(&p->readEvent))
    Event_Set(&p->readEvent);
  */

  for (i = 0; i < MTCODER__THREADS_MAX; i++)
    MtCoderThread_Destruct(&p->threads[i]);

  Event_Close(&p->readEvent);
  Semaphore_Close(&p->blocksSemaphore);

  #ifdef MTCODER__USE_WRITE_THREAD
    for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
      Event_Close(&p->writeEvents[i]);
  #else
    Event_Close(&p->finishedEvent);
  #endif
}


void MtCoder_Destruct(CMtCoder *p)
{
  MtCoder_Free(p);

  CriticalSection_Delete(&p->cs);
  CriticalSection_Delete(&p->mtProgress.cs);
}


SRes MtCoder_Code(CMtCoder *p)
{
  unsigned numThreads = p->numThreadsMax;
  unsigned numBlocksMax;
  unsigned i;
  SRes res = SZ_OK;

  if (numThreads > MTCODER__THREADS_MAX)
    numThreads = MTCODER__THREADS_MAX;
  numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads);
  
  if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
  if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
  if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;

  if (numBlocksMax > MTCODER__BLOCKS_MAX)
    numBlocksMax = MTCODER__BLOCKS_MAX;

  if (p->blockSize != p->allocatedBufsSize)
  {
    for (i = 0; i < MTCODER__THREADS_MAX; i++)
    {
      CMtCoderThread *t = &p->threads[i];
      if (t->inBuf)
      {
        ISzAlloc_Free(p->allocBig, t->inBuf);
        t->inBuf = NULL;
      }
    }
    p->allocatedBufsSize = p->blockSize;
  }

  p->readRes = SZ_OK;

  MtProgress_Init(&p->mtProgress, p->progress);

  #ifdef MTCODER__USE_WRITE_THREAD
    for (i = 0; i < numBlocksMax; i++)
    {
      RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i]));
    }
  #else
    RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
  #endif

  {
    RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent));

    if (Semaphore_IsCreated(&p->blocksSemaphore))
    {
      RINOK_THREAD(Semaphore_Close(&p->blocksSemaphore));
    }
    RINOK_THREAD(Semaphore_Create(&p->blocksSemaphore, numBlocksMax, numBlocksMax));
  }

  for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++)
    p->freeBlockList[i] = i + 1;
  p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1;
  p->freeBlockHead = 0;

  p->readProcessed = 0;
  p->blockIndex = 0;
  p->numBlocksMax = numBlocksMax;
  p->stopReading = False;

  #ifndef MTCODER__USE_WRITE_THREAD
    p->writeIndex = 0;
    p->writeRes = SZ_OK;
    for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
      p->ReadyBlocks[i] = False;
    p->numFinishedThreads = 0;
  #endif

  p->numStartedThreadsLimit = numThreads;
  p->numStartedThreads = 0;

  // for (i = 0; i < numThreads; i++)
  {
    CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
    RINOK(MtCoderThread_CreateAndStart(nextThread));
  }

  RINOK_THREAD(Event_Set(&p->readEvent))

  #ifdef MTCODER__USE_WRITE_THREAD
  {
    unsigned bi = 0;

    for (;; bi++)
    {
      if (bi >= numBlocksMax)
        bi = 0;

      RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))

      {
        const CMtCoderBlock *block = &p->blocks[bi];
        unsigned bufIndex = block->bufIndex;
        BoolInt finished = block->finished;
        if (res == SZ_OK && block->res != SZ_OK)
          res = block->res;

        if (bufIndex != (unsigned)(int)-1)
        {
          if (res == SZ_OK)
          {
            res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);
            if (res != SZ_OK)
              MtProgress_SetError(&p->mtProgress, res);
          }
          
          CriticalSection_Enter(&p->cs);
          {
            p->freeBlockList[bufIndex] = p->freeBlockHead;
            p->freeBlockHead = bufIndex;
          }
          CriticalSection_Leave(&p->cs);
        }
        
        RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))

        if (finished)
          break;
      }
    }
  }
  #else
  {
    WRes wres = Event_Wait(&p->finishedEvent);
    res = MY_SRes_HRESULT_FROM_WRes(wres);
  }
  #endif

  if (res == SZ_OK)
    res = p->readRes;

  if (res == SZ_OK)
    res = p->mtProgress.res;

  #ifndef MTCODER__USE_WRITE_THREAD
    if (res == SZ_OK)
      res = p->writeRes;
  #endif

  if (res != SZ_OK)
    MtCoder_Free(p);
  return res;
}

#endif