483 lines
14 KiB
C++
483 lines
14 KiB
C++
//////////////////////////////////////////////////////////////////////////
|
|
// Implementation of CRefReadStreamProxy
|
|
// The proxy is the per-client unique object that is acquired by the Streaming
|
|
// Engine clients via StartRead() API. Several Proxies can refer to the same
|
|
// Stream (file) but perform their operations independently.
|
|
|
|
#include "stdafx.h"
|
|
#include <ISystem.h>
|
|
#include <ILog.h>
|
|
#include "RefStreamEngine.h"
|
|
#include "RefReadStream.h"
|
|
#include "RefReadStreamProxy.h"
|
|
|
|
extern ISystem* g_System;
|
|
extern CMTSafeHeap* g_pSmallHeap;
|
|
extern CMTSafeHeap* g_pBigHeap;
|
|
|
|
CRefReadStreamProxy::CRefReadStreamProxy (const char* szSource, CRefReadStream* pStream, IStreamCallback* pCallback, StreamReadParams* pParams):
|
|
m_strClient(szSource),
|
|
m_pStream (pStream),
|
|
m_pCallback(pCallback),
|
|
m_bError(false),
|
|
m_bFinished (false),
|
|
m_bFreeBuffer (false),
|
|
m_bPending (false),
|
|
m_pBuffer (NULL),
|
|
m_numBytesRead (0),
|
|
m_numRetries (0)
|
|
{
|
|
if (pParams)
|
|
m_Params = *pParams;
|
|
m_pBuffer = m_Params.pBuffer;
|
|
#if LOG_IO
|
|
g_System->GetILog()->LogToFile ("\006io:CRefReadStreamProxy %p(%s, %p)", this, szSource, pCallback);
|
|
#endif
|
|
pStream->Register(this);
|
|
}
|
|
|
|
CRefReadStreamProxy::~CRefReadStreamProxy ()
|
|
{
|
|
if (!m_bFinished && !m_bError)
|
|
OnFinishRead(ERROR_UNEXPECTED_DESTRUCTION);
|
|
if (m_bFreeBuffer && m_pBuffer)
|
|
g_pBigHeap->Free (m_pBuffer);
|
|
#if LOG_IO
|
|
g_System->GetILog()->LogToFile ("\006io:~CRefReadStreamProxy %p(%s, %p)", this, m_strClient.c_str(), m_pCallback);
|
|
#endif
|
|
m_pStream->Unregister(this);
|
|
}
|
|
|
|
// returns true if the file read was not successful.
|
|
bool CRefReadStreamProxy::IsError()
|
|
{
|
|
return m_bError;
|
|
}
|
|
|
|
// returns true if the file read was completed (successfully or unsuccessfully)
|
|
// check IsError to check if the whole requested file (piece) was read
|
|
bool CRefReadStreamProxy::IsFinished()
|
|
{
|
|
return m_bFinished;
|
|
}
|
|
|
|
// returns the number of bytes read so far (the whole buffer size if IsFinished())
|
|
unsigned int CRefReadStreamProxy::GetBytesRead (bool bWait)
|
|
{
|
|
if (m_bPending)
|
|
{
|
|
if (m_pStream->isOverlapped())
|
|
{
|
|
DWORD dwBytesRead;
|
|
if (GetOverlappedResult(m_pStream->GetFile(), &m_Overlapped, &dwBytesRead, bWait))
|
|
{
|
|
m_numBytesRead = m_nPieceOffset + dwBytesRead;
|
|
assert (dwBytesRead <= m_nPieceLength);
|
|
}
|
|
}
|
|
}
|
|
return m_numBytesRead;
|
|
}
|
|
|
|
|
|
// returns the buffer into which the data has been or will be read
|
|
// at least GetBytesRead() bytes in this buffer are guaranteed to be already read
|
|
const void* CRefReadStreamProxy::GetBuffer ()
|
|
{
|
|
return m_pBuffer;
|
|
}
|
|
|
|
// tries to stop reading the stream; this is advisory and may have no effect
|
|
// but the callback will not be called after this. If you just destructing object,
|
|
// dereference this object and it will automatically abort and release all associated resources.
|
|
void CRefReadStreamProxy::Abort()
|
|
{
|
|
// lock this object to avoid preliminary destruction
|
|
CRefReadStreamProxy_AutoPtr pLock (this);
|
|
if (m_bPending)
|
|
{
|
|
m_pStream->Abort(this);
|
|
// we need to wait to avoid letting the client freeing the buffer before the read is finished
|
|
if (!m_bFreeBuffer) // [sergiy] Comment this line (only if) out to let it complete all operations anyway
|
|
Wait();
|
|
}
|
|
else
|
|
{
|
|
m_pCallback = NULL;
|
|
m_bError = true;
|
|
m_nIOError = ERROR_USER_ABORT;
|
|
}
|
|
//assert (m_pCallback == NULL);
|
|
// perhaps the callback was already called, or perhaps not. In any case we forget about the callback
|
|
m_pCallback = NULL;
|
|
}
|
|
|
|
// tries to raise the priority of the read; this is advisory and may have no effect
|
|
void CRefReadStreamProxy::RaisePriority (unsigned nPriority)
|
|
{
|
|
if (m_Params.nPriority != nPriority)
|
|
{
|
|
m_Params.nPriority = nPriority;
|
|
m_pStream->OnRaisePriority(this, nPriority);
|
|
}
|
|
}
|
|
|
|
// unconditionally waits until the callback is called
|
|
// i.e. if the stream hasn't yet finish, it's guaranteed that the user-supplied callback
|
|
// is called before return from this function (unless no callback was specified)
|
|
void CRefReadStreamProxy::Wait()
|
|
{
|
|
// lock this object to avoid preliminary destruction
|
|
CRefReadStreamProxy_AutoPtr pLock (this);
|
|
// move it to the top of the corresponding queues
|
|
RaisePriority(INT_MAX);
|
|
|
|
// while the stream reading hasn't finished, OR the callback isn't still called, wait
|
|
while ((!m_bFinished && !m_bError) || m_pCallback)
|
|
{
|
|
m_pStream->GetEngine()->UpdateAndWait(100, IStreamEngine::FLAGS_DISABLE_CALLBACK_TIME_QUOTA);
|
|
}
|
|
}
|
|
|
|
// the interface for the actual stream
|
|
// returns true if the read has been started and no further attempts to do so are required
|
|
// returns false if couldn't start,and retry is required
|
|
// nMemQuota is the max number of bytes to allocate from the Engine's "Big Heap" for the piece of file
|
|
// that is read. Pass a big value to let it allocate as much as it wants.
|
|
bool CRefReadStreamProxy::StartRead (unsigned nMemQuota)
|
|
{
|
|
if (m_bError || m_bFinished || m_bPending)
|
|
{
|
|
// Why this assert happened?
|
|
//---------------------------
|
|
// THe stream read was automatically initiated, while the stream is marked as
|
|
// having been read, being read, or failed, i.e. it can't be restarted again.
|
|
|
|
// This can generally happen when the read has been started, and immediately aborted,
|
|
// while being put on hold. When there are enough IO resources to start reading it,
|
|
// we detect that it's been already aborted and don't restart it.
|
|
// This is the only known reason for that.
|
|
assert (m_nIOError == ERROR_USER_ABORT);
|
|
return true; // invalid call, no need to retry
|
|
}
|
|
|
|
if (!m_pStream->Activate())
|
|
{
|
|
OnFinishRead(ERROR_CANT_OPEN_FILE); // can't open file
|
|
return true; // no need to retry
|
|
}
|
|
|
|
if (m_pStream->IsError())
|
|
{
|
|
OnFinishRead(ERROR_REFSTREAM_ERROR); // file is invalid
|
|
return true;
|
|
}
|
|
|
|
if (m_Params.nOffset >= m_pStream->GetFileSize())
|
|
{
|
|
// offset out of range
|
|
OnFinishRead(ERROR_OFFSET_OUT_OF_RANGE);
|
|
return true;
|
|
}
|
|
|
|
if (m_Params.nSize > m_pStream->GetFileSize())
|
|
{
|
|
OnFinishRead(ERROR_SIZE_OUT_OF_RANGE);
|
|
return true;
|
|
}
|
|
|
|
if (m_Params.nSize == 0)
|
|
{
|
|
// by default, we read the whole file
|
|
m_Params.nSize = m_pStream->GetFileSize() - m_Params.nOffset;
|
|
}
|
|
else
|
|
if (m_Params.nOffset + m_Params.nSize > m_pStream->GetFileSize())
|
|
{
|
|
// it's impossible to read the specified region of the file
|
|
OnFinishRead(ERROR_REGION_OUT_OF_RANGE);
|
|
return true;
|
|
}
|
|
|
|
if (!m_pBuffer)
|
|
{
|
|
if (nMemQuota < m_Params.nSize)
|
|
{
|
|
// try another time, of fail now if the retries are over
|
|
if (++m_numRetries < g_numMaxRetries)
|
|
return false;
|
|
else
|
|
{
|
|
OnFinishRead(ERROR_OUT_OF_MEMORY_QUOTA);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
m_pBuffer = g_pBigHeap->Alloc(m_Params.nSize, "CRefReadStreamProxy::StartRead: m_pBuffer");
|
|
if (!m_pBuffer)
|
|
{
|
|
OnFinishRead(ERROR_OUT_OF_MEMORY);
|
|
return true;
|
|
}
|
|
m_bFreeBuffer = true;
|
|
}
|
|
|
|
HANDLE hFile = m_pStream->GetFile();
|
|
m_nPieceOffset = 0; // we're just start reading
|
|
// we should load in blocks, if we load overlapped;
|
|
// we should load in one continuous read, if we load non-overlapped
|
|
unsigned nMaxBlockLength = m_pStream->GetEngine()->GetPak()->GetPakVars()->nReadSlice * 1024;
|
|
if (!nMaxBlockLength)
|
|
nMaxBlockLength = g_nBlockLength;
|
|
m_nPieceLength = m_pStream->isOverlapped() ? min (m_Params.nSize, nMaxBlockLength) : m_Params.nSize;
|
|
m_numBytesRead = 0;
|
|
|
|
// lock the object for the time of read operation
|
|
this->AddRef();
|
|
|
|
if ((m_Params.nFlags & SRP_FLAGS_ASYNC_PROGRESS) && m_pCallback)
|
|
m_pCallback->StreamOnProgress(this);
|
|
|
|
m_bPending = true;
|
|
++g_numPendingOperations;
|
|
DWORD dwError = CallReadFileEx ();
|
|
if (dwError)
|
|
{
|
|
m_bPending = false;
|
|
--g_numPendingOperations;
|
|
|
|
bool bResult = true; // by default, signal an error
|
|
switch (dwError)
|
|
{
|
|
#if !defined(LINUX)
|
|
case ERROR_NOT_ENOUGH_MEMORY:
|
|
case ERROR_INVALID_USER_BUFFER:
|
|
#endif
|
|
case ERROR_NO_SYSTEM_RESOURCES:
|
|
if (++m_numRetries < g_numMaxRetries)
|
|
bResult = false; // try again
|
|
}
|
|
// if bResult == false, it means we will want to try again, so we don't finish reading in this case
|
|
if (bResult)
|
|
OnFinishRead(dwError);
|
|
|
|
this->Release();
|
|
return bResult;
|
|
}
|
|
else
|
|
return true;
|
|
}
|
|
|
|
unsigned CRefReadStreamProxy::g_numPendingOperations = 0;
|
|
|
|
VOID CALLBACK CRefReadStreamProxy::FileIOCompletionRoutine (
|
|
DWORD dwErrorCode, // completion code
|
|
DWORD dwNumberOfBytesTransfered, // number of bytes transferred
|
|
LPOVERLAPPED lpOverlapped // I/O information buffer
|
|
)
|
|
{
|
|
#if defined(LINUX)
|
|
assert(lpOverlapped->pCaller != 0);
|
|
CRefReadStreamProxy* pThis = (CRefReadStreamProxy*)lpOverlapped->pCaller;
|
|
#else
|
|
const LONG_PTR nOOffset = (LONG_PTR)(&((CRefReadStreamProxy*)0)->m_Overlapped);
|
|
CRefReadStreamProxy* pThis = (CRefReadStreamProxy*)((LONG_PTR)lpOverlapped - nOOffset);
|
|
#endif
|
|
// this is only called when the stream is overlapped
|
|
assert (pThis->m_pStream->isOverlapped());
|
|
pThis->OnIOComplete (dwErrorCode, dwNumberOfBytesTransfered);
|
|
}
|
|
|
|
void CRefReadStreamProxy::OnIOComplete(unsigned nError, unsigned numBytesRead)
|
|
{
|
|
m_numBytesRead = m_nPieceOffset + numBytesRead;
|
|
|
|
// if there are more bytes read than was requested (e.g. because of the sector content after the EOF), we trim it.
|
|
if (!nError && m_numBytesRead > m_Params.nSize)
|
|
m_numBytesRead = m_Params.nSize;
|
|
|
|
--g_numPendingOperations;
|
|
m_bPending = false;
|
|
|
|
// calculate the next piece offset/length
|
|
m_nPieceOffset = m_numBytesRead;
|
|
assert (m_Params.nSize>=m_numBytesRead);
|
|
|
|
unsigned nMaxBlockLength = m_pStream->GetEngine()->GetPak()->GetPakVars()->nReadSlice * 1024;
|
|
if (!nMaxBlockLength)
|
|
nMaxBlockLength = g_nBlockLength;
|
|
|
|
m_nPieceLength = min (m_Params.nSize-m_numBytesRead,nMaxBlockLength);
|
|
|
|
// if there's nothing more to read,
|
|
// or there's an error, finish reading
|
|
if (nError || !m_nPieceLength)
|
|
{
|
|
OnFinishRead(nError);
|
|
|
|
// unlock the object for the time of read operation
|
|
this->Release();
|
|
}
|
|
else
|
|
{
|
|
// there's no error and there's still a piece to read. Read it further.
|
|
// we can't call progress messages from another thread
|
|
//if (m_pCallback)
|
|
// m_pCallback->StreamOnProgress(this);
|
|
|
|
m_bPending = true;
|
|
DWORD dwError = CallReadFileEx();
|
|
if (dwError)
|
|
{
|
|
m_bPending = false;
|
|
OnFinishRead(dwError);
|
|
this->Release();
|
|
return;
|
|
}
|
|
else
|
|
{
|
|
++g_numPendingOperations;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
// on the platforms that support overlapped IO, calls ReadFileEx.
|
|
// on other platforms merely reads the file, calling OnIOComplete()
|
|
DWORD CRefReadStreamProxy::CallReadFileEx ()
|
|
{
|
|
HANDLE hFile = m_pStream->GetFile();
|
|
|
|
if (hFile == INVALID_HANDLE_VALUE)
|
|
{
|
|
// this is a special case, reading from the cache memory directly - we should handle it separately
|
|
void* pSource = m_pStream->GetFileData();
|
|
if (!pSource)
|
|
{
|
|
OnIOComplete(ERROR_ZIP_CACHE_FAILURE,0);
|
|
return 0;
|
|
}
|
|
memcpy (((char*)m_pBuffer) + m_nPieceOffset, ((char*)pSource) + m_Params.nOffset + m_nPieceOffset, m_nPieceLength);
|
|
OnIOComplete(0, m_nPieceLength);
|
|
return 0;
|
|
}
|
|
|
|
if (m_pStream->isOverlapped())
|
|
{
|
|
memset (&m_Overlapped, 0, sizeof(m_Overlapped));
|
|
m_Overlapped.Offset = m_Params.nOffset + m_nPieceOffset + m_pStream->GetArchiveOffset();
|
|
#if defined(LINUX)
|
|
m_Overlapped.pCaller = (void*)this;//store caller address here
|
|
#endif
|
|
if (!ReadFileEx (hFile, ((char*)m_pBuffer) + m_nPieceOffset, m_nPieceLength, &m_Overlapped, FileIOCompletionRoutine))
|
|
{
|
|
DWORD dwError = GetLastError();
|
|
if (!dwError)
|
|
dwError = ERROR_CANT_START_READING;
|
|
|
|
return dwError;
|
|
}
|
|
else
|
|
return 0;
|
|
}
|
|
else
|
|
{
|
|
// the actual number of bytes read
|
|
DWORD dwRead = 0;
|
|
unsigned newOffset = m_Params.nOffset + m_nPieceOffset + m_pStream->GetArchiveOffset();
|
|
if (SetFilePointer (hFile, newOffset, NULL, FILE_BEGIN) != newOffset)
|
|
{
|
|
// the positioning error is strange, we should examine it and perhaps retry (in case the file write wasn't finished.)
|
|
DWORD dwError = GetLastError();
|
|
return dwError;
|
|
}
|
|
// just read the file
|
|
if (!ReadFile (hFile, ((char*)m_pBuffer) + m_nPieceOffset, m_nPieceLength, &dwRead, NULL))
|
|
{
|
|
// we failed to read; we don't call the callback, but we could as well call OnIOComplete()
|
|
// with this error code and return 0 as success flag emulating error during load
|
|
DWORD dwError = GetLastError();
|
|
//return dwError;
|
|
// we call the callback to signal about the error, and return 0 signaling that the operation has completed
|
|
OnIOComplete(dwError, dwRead);
|
|
return 0;
|
|
}
|
|
else
|
|
{
|
|
OnIOComplete (0,dwRead);
|
|
return 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
void CRefReadStreamProxy::OnFinishRead(unsigned nError)
|
|
{
|
|
// [marco] commented out, according to sergiy this is harmless
|
|
//assert (!m_bFinished && !m_bError);
|
|
if (!nError)
|
|
m_bFinished = true;
|
|
else
|
|
m_bError = true;
|
|
|
|
m_nIOError = nError;
|
|
m_pStream->OnIOExecuted(this);
|
|
}
|
|
|
|
// this returns true after the main IO job has been executed (either in worker or in main thread)
|
|
bool CRefReadStreamProxy::IsIOExecuted()
|
|
{
|
|
return IsFinished() || IsError();
|
|
}
|
|
|
|
// this gets called upon the IO has been executed to call the callbacks
|
|
void CRefReadStreamProxy::FinalizeIO()
|
|
{
|
|
if (m_pCallback)
|
|
{
|
|
// be carefull! this object can be deallocated inside the callback!
|
|
IStreamCallback* pCallback = m_pCallback;
|
|
m_pCallback = NULL; // don't call this callback any more as it may have been deallocated
|
|
#if LOG_IO
|
|
g_System->GetILog()->LogToFile ("\006io(%s) err %d%s%s%s%s piece(%d:%d) read %d %s userdata %d, pri %d, flags %x, offs %d, size %d", m_strClient.c_str(), m_nIOError,
|
|
m_bError?" Error":"", m_bFinished?" Finished":"", m_bFreeBuffer?" FreeBuffer":"", m_bPending?" Pending":"",
|
|
m_nPieceOffset, m_nPieceLength,
|
|
m_numBytesRead,
|
|
m_pStream->GetFileName().c_str(),
|
|
m_Params.dwUserData,
|
|
m_Params.nPriority,
|
|
m_Params.nFlags,
|
|
m_Params.nOffset,m_Params.nSize);
|
|
#endif
|
|
pCallback->StreamOnComplete(this, m_bError?m_nIOError:0);
|
|
#if LOG_IO
|
|
g_System->GetILog()->LogToFile ("\006io callback %p returned", pCallback);
|
|
#endif
|
|
}
|
|
}
|
|
|
|
string CRefReadStreamProxy::Dump()
|
|
{
|
|
char szDump[0x300];
|
|
_snprintf (szDump, sizeof(szDump), "%s: callback %p, %s%s%s %d bytes read, offset=%d, size=%d, flags=%x",
|
|
m_strClient.c_str(),
|
|
m_pCallback,
|
|
m_bPending?"PENDING ":"",
|
|
m_bFinished?"FINISHED ":"",
|
|
m_bError?"ERROR ":"",
|
|
m_numBytesRead,
|
|
m_Params.nOffset,
|
|
m_Params.nSize,
|
|
m_Params.nFlags);
|
|
return szDump;
|
|
}
|
|
|
|
// returns the size of allocated memory for this object and all subobjects if any
|
|
size_t CRefReadStreamProxy::GetSize()
|
|
{
|
|
size_t nSize = sizeof(*this);
|
|
nSize += m_strClient.capacity();
|
|
if (m_pBuffer)
|
|
nSize += m_Params.nSize;
|
|
return nSize;
|
|
} |