/*-------------------------------------------------------------------------
* drawElements C++ Base Library
* -----------------------------
*
* Copyright 2014 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*//*!
* \file
* \brief Thread-safe ring buffer template.
*//*--------------------------------------------------------------------*/
#include "deThreadSafeRingBuffer.hpp"
#include "deRandom.hpp"
#include "deThread.hpp"
#include <vector>
using std::vector;
namespace de
{
namespace
{
struct Message
{
deUint32 data;
Message (deUint16 threadId, deUint16 payload)
: data((threadId << 16) | payload)
{
}
Message (void)
: data(0)
{
}
deUint16 getThreadId (void) const { return data >> 16; }
deUint16 getPayload (void) const { return data & 0xffff; }
};
class Consumer : public Thread
{
public:
Consumer (ThreadSafeRingBuffer<Message>& buffer, int numProducers)
: m_buffer (buffer)
{
m_lastPayload.resize(numProducers, 0);
m_payloadSum.resize(numProducers, 0);
}
void run (void)
{
for (;;)
{
Message msg = m_buffer.popBack();
deUint16 threadId = msg.getThreadId();
if (threadId == 0xffff)
break;
DE_TEST_ASSERT(de::inBounds<int>(threadId, 0, (int)m_lastPayload.size()));
DE_TEST_ASSERT((m_lastPayload[threadId] == 0 && msg.getPayload() == 0) || m_lastPayload[threadId] < msg.getPayload());
m_lastPayload[threadId] = msg.getPayload();
m_payloadSum[threadId] += (deUint32)msg.getPayload();
}
}
deUint32 getPayloadSum (deUint16 threadId) const
{
return m_payloadSum[threadId];
}
private:
ThreadSafeRingBuffer<Message>& m_buffer;
vector<deUint16> m_lastPayload;
vector<deUint32> m_payloadSum;
};
class Producer : public Thread
{
public:
Producer (ThreadSafeRingBuffer<Message>& buffer, deUint16 threadId, int dataSize)
: m_buffer (buffer)
, m_threadId (threadId)
, m_dataSize (dataSize)
{
}
void run (void)
{
// Yield to give main thread chance to start other producers.
deSleep(1);
for (int ndx = 0; ndx < m_dataSize; ndx++)
m_buffer.pushFront(Message(m_threadId, (deUint16)ndx));
}
private:
ThreadSafeRingBuffer<Message>& m_buffer;
deUint16 m_threadId;
int m_dataSize;
};
} // anonymous
void ThreadSafeRingBuffer_selfTest (void)
{
const int numIterations = 16;
for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
{
Random rnd (iterNdx);
int bufSize = rnd.getInt(1, 2048);
int numProducers = rnd.getInt(1, 16);
int numConsumers = rnd.getInt(1, 16);
int dataSize = rnd.getInt(1000, 10000);
ThreadSafeRingBuffer<Message> buffer (bufSize);
vector<Producer*> producers;
vector<Consumer*> consumers;
for (int i = 0; i < numProducers; i++)
producers.push_back(new Producer(buffer, (deUint16)i, dataSize));
for (int i = 0; i < numConsumers; i++)
consumers.push_back(new Consumer(buffer, numProducers));
// Start consumers.
for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
(*i)->start();
// Start producers.
for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
(*i)->start();
// Wait for producers.
for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
(*i)->join();
// Write end messages for consumers.
for (int i = 0; i < numConsumers; i++)
buffer.pushFront(Message(0xffff, 0));
// Wait for consumers.
for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
(*i)->join();
// Verify payload sums.
deUint32 refSum = 0;
for (int i = 0; i < dataSize; i++)
refSum += (deUint32)(deUint16)i;
for (int i = 0; i < numProducers; i++)
{
deUint32 cmpSum = 0;
for (int j = 0; j < numConsumers; j++)
cmpSum += consumers[j]->getPayloadSum(i);
DE_TEST_ASSERT(refSum == cmpSum);
}
// Free resources.
for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
delete *i;
for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
delete *i;
}
}
} // de