/////////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2005-2017 Dawson Dean
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
// 
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
// 
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
// TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
// SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
//
/////////////////////////////////////////////////////////////////////////////
//
// AsyncIOStream Module
//
// This implements basic stream interface on top of block IO devices like
// files, networks, and memory regions. It presents the data as a continuous
// stream of bytes, so it hides packet, sector and block boundaries.
//
// An AsyncIOStream keeps the data in a queue of buffers. The number of buffers
// will vary depending on the underlying block device. If the device is seekable,
// like a file, then the AsyncIOStream keeps only a few memory buffers, and will
// synchronously reads buffers from the file as they are needed. This allows
// a data stream to keep a small memory footprint for a huge file. It also
// means the AsyncIOStream acts like a cache, so it orders the buffers in LRU
// order and implements a replacement policy.
//
// If the underlying blockIO is not seekable, like a network connection, then
// the AsyncIOStream keeps all buffers in memory. If this becomes a problem,
// like using HTTP to download huge documents, then a higher level of the code,
// the HTTP stream module, will transfer data from a http AsyncIOStream to
// a file AsyncIOStream.
//
// Finally, some asyncIO streams can just keep data in buffers, like a growing
// bag of bytes in memory. This is useful to first format and write some data
// to a temporary stream and then transfer that to another network or file
// stream.
//
// This module provides asynchrony with the "listen" and "flush" functions.
// Reading and writing a stream never blocks, and is never asynchronous. Instead,
// it always reads or writes a memory buffer. This is important because it means
// that stream reading code, like a grammar parser, never has the complexity of
// asynchrony. Instead, clients first ask the IO stream to asynchronously report
// when some data is available, either a specific number of bytes or any bytes.
// This will signal a callback function (the event handler), and then the client
// can do the reads.
//
// Similarly, when you write to a stream, the data is internally written to a
// buffer and may be flushed to the backing store asynchronously. There is
// an asynchronous flush command which will notify the client when all data
// in the stream has been comitted to the backing store.
//
// There is one callback function associated with each stream. This receives
// all notifications that data is available or a write has completed. Normally,
// this is provided when the stream is opened, but it can also be dynamically
// changed when the "owner" of a stream is changed. Originally, I wanted a
// different callback for each operation, but this had several problems. For
// example, some events (like a network peer disconnects) may happen at any
// time, so you really need a constant listener callback. Individual callbacks
// also adds complications when several asynch-listens are active at once; you
// must match each callback to each listen.
//
// Internally, the buffers never overlap, and each corresponds to a single buffer
// that we would read or write in the underlying block device.
// There is one active foreground buffer where the stream is currently reading and
// possibly writing. Other buffers are unused, or valid but
// idle (cache), or in transit with an asynchronous read or write.
//
/////////////////////////////////////////////////////////////////////////////

#include "osIndependantLayer.h"
#include "config.h"
#include "log.h"
#include "debugging.h"
#include "memAlloc.h"
#include "refCount.h"
#include "threads.h"
#include "stringLib.h"
#include "stringParse.h"
#include "queue.h"
#include "jobQueue.h"
#include "rbTree.h"
#include "nameTable.h"
#include "url.h"
#include "blockIO.h"
#include "asyncIOStream.h"

FILE_DEBUGGING_GLOBALS(LOG_LEVEL_DEFAULT, 0);





/////////////////////////////////////////////////////////////////////////////
//
// [CAsyncIOStream]
//
// This procedure leaves the AsyncIOStream in an initial state so
// ohter methods can tell that init has not been called yet.
/////////////////////////////////////////////////////////////////////////////
CAsyncIOStream::CAsyncIOStream() {
    m_AsyncIOStreamFlags = 0;
    m_pLock = NULL;

    m_pBlockIO = NULL;
    m_pIOSystem = NULL;

    m_TotalAvailableBytes = 0;
    m_MinPosition = 0;

    m_pActiveIOBuffer = NULL;
    m_pFirstValidByte = NULL;
    m_pNextValidByte = NULL;
    m_pEndValidBytes = NULL;
    m_pLastPossibleValidByte = NULL;

    m_pActiveOutputIOBuffer = NULL;
    m_pFirstValidOutputByte = NULL;
    m_pNextValidOutputByte = NULL;
    m_pLastPossibleValidOutputByte = NULL;

    m_IOBufferList.ResetQueue();
    m_MaxNumIOBuffersForSeekableDevices = 1;

    m_AsynchLoadType = NOT_LOADING;
    m_NextAsynchBufferPosition = 0;
    m_LoadStartPosition = 0;
    m_LoadStopPosition = 0;
    m_NextLoadStartPosition = 0;

    m_OutputBufferList.ResetQueue();

    m_pEventHandler = NULL;
    m_pEventHandlerContext = NULL;
} // CAsyncIOStream.







/////////////////////////////////////////////////////////////////////////////
//
// [~CAsyncIOStream]
//
/////////////////////////////////////////////////////////////////////////////
CAsyncIOStream::~CAsyncIOStream() {
    if (NULL != m_pBlockIO) {
       Close();
    }

    RELEASE_OBJECT(m_pEventHandler);
    RELEASE_OBJECT(m_pLock);
} // ~CAsyncIOStream.







/////////////////////////////////////////////////////////////////////////////
//
// [OpenAsyncIOStream]
//
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::OpenAsyncIOStream(
                        CParsedUrl *pUrl,
                        int32 openOptions,
                        CAsyncIOEventHandler *pEventHandler,
                        void *pEventHandlerContext
                       ) {
    ErrVal err = ENoErr;
    CAsyncIOStream *pAsyncIOStream = NULL;

    if ((NULL == pUrl) || (NULL == pEventHandler)) {
        gotoErr(EFail);
    }

    pAsyncIOStream = newex CAsyncIOStream;
    if (NULL == pAsyncIOStream) {
        gotoErr(EFail);
    }

    DEBUG_LOG("CAsyncIOStream::OpenAsyncIOStream. url = %s",
                pUrl->GetPrintableURL());

    // Don't call RunChecks because we do not have an open stream yet.
    err = pAsyncIOStream->StartOpenCommand(pEventHandler, pEventHandlerContext);
    if (err) {
        gotoErr(err);
    }

    if ((CParsedUrl::URL_SCHEME_HTTP == pUrl->m_Scheme)
        || (CParsedUrl::URL_SCHEME_URN == pUrl->m_Scheme)
        || (CParsedUrl::URL_SCHEME_FTP == pUrl->m_Scheme)
        || (CParsedUrl::URL_SCHEME_IP_ADDRESS == pUrl->m_Scheme)) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::OpenAsyncIOStream. Call g_pNetIOSystem->OpenBlockIO");
        err = g_pNetIOSystem->OpenBlockIO(pUrl, openOptions, pAsyncIOStream);
    } else if (CParsedUrl::URL_SCHEME_FILE == pUrl->m_Scheme) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::OpenAsyncIOStream. Call g_pFileIOSystem->OpenBlockIO");
        openOptions |= CAsyncBlockIO::USE_SYNCHRONOUS_IO;
        err = g_pFileIOSystem->OpenBlockIO(pUrl, openOptions, pAsyncIOStream);
    } else if (CParsedUrl::URL_SCHEME_MEMORY == pUrl->m_Scheme) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::OpenAsyncIOStream. Call g_pMemoryIOSystem->OpenBlockIO");
        openOptions |= CAsyncBlockIO::USE_SYNCHRONOUS_IO;
        err = g_pMemoryIOSystem->OpenBlockIO(pUrl, openOptions, pAsyncIOStream);
    } else if (CParsedUrl::URL_SCHEME_EMPTY == pUrl->m_Scheme) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::OpenAsyncIOStream. Call g_EmptyIOSystem.OpenBlockIO");
        ASSERT(0);
    } else {
        err = EFail;
    }
    DEBUG_LOG_VERBOSE("CAsyncIOStream::OpenAsyncIOStream. OpenBlockIO returns err=%d", err);

    // DO NOT DO ANY WORK HERE.
    // OpenBlockIO is asynch, so it may have already signalled
    // completion by now.

abort:
    RELEASE_OBJECT(pAsyncIOStream);

    returnErr(err);
} // OpenAsyncIOStream.








/////////////////////////////////////////////////////////////////////////////
//
// [StartOpenCommand]
//
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::StartOpenCommand(
                        CAsyncIOEventHandler *pEventHandler,
                        void *pEventHandlerContext) {
    ErrVal err = ENoErr;

    // If this stream is already open, then we cannot re-open it.
    if (m_pBlockIO) {
        gotoErr(EFail);
    }

    m_AsyncIOStreamFlags = 0;

    // It is OK for pEventHandler to be NULL.
    m_pEventHandler = pEventHandler;
    ADDREF_OBJECT(m_pEventHandler);
    m_pEventHandlerContext = pEventHandlerContext;

    // Initially, there are no bytes in the buffer.
    m_pActiveIOBuffer = NULL;
    m_pFirstValidByte = NULL;
    m_pNextValidByte = NULL;
    m_pEndValidBytes = NULL;
    m_pLastPossibleValidByte = NULL;

    m_pActiveOutputIOBuffer = NULL;
    m_pFirstValidOutputByte = NULL;
    m_pNextValidOutputByte = NULL;
    m_pLastPossibleValidOutputByte = NULL;

    m_IOBufferList.ResetQueue();
    m_OutputBufferList.ResetQueue();

    m_MaxNumIOBuffersForSeekableDevices = 1;
    m_TotalAvailableBytes = 0;

abort:
    returnErr(err);
} // StartOpenCommand.







/////////////////////////////////////////////////////////////////////////////
//
// [FinishOpenCommand]
//
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::FinishOpenCommand() {
    ErrVal err = ENoErr;
    char cSaveChar;
    char *pFilePtr;
    char *pEndFilePtr;
    char *pBuffer;
    int32 bufferLength;
    int32 numValidBytes;
    int32 numFieldsRead;

    DEBUG_LOG_VERBOSE("CAsyncIOStream::FinishOpenCommand");
    if (NULL == m_pBlockIO) {
        returnErr(EFail);
    }

    m_pIOSystem = m_pBlockIO->GetIOSystem();
    m_TotalAvailableBytes = m_pBlockIO->GetMediaSize();

    // The lock may have been held by a previous blockIO.
    // This can happen when we accept a network connection.
    RELEASE_OBJECT(m_pLock);
    m_pLock = m_pBlockIO->GetLock();
    if (NULL == m_pLock) {
        gotoErr(EFail);
    }

    // If this is a memory blockIO, then make one buffer entry that
    // references the contents of the blockIO.
    if ((NULL != m_pBlockIO->m_pUrl)
        && (CParsedUrl::URL_SCHEME_MEMORY == m_pBlockIO->m_pUrl->m_Scheme)) {
        numValidBytes = 0;

        // Parse the URL.
        pFilePtr = m_pBlockIO->m_pUrl->m_pPath;
        pEndFilePtr = pFilePtr + m_pBlockIO->m_pUrl->m_PathSize;
        cSaveChar = *pEndFilePtr;
        *pEndFilePtr = 0;
        numFieldsRead = sscanf(
                            m_pBlockIO->m_pUrl->m_pPath,
                            "%p/%d/%d",
                            &pBuffer,
                            &bufferLength,
                            &numValidBytes);
        *pEndFilePtr = cSaveChar;

        if (3 != numFieldsRead) {
            gotoErr(EFail);
        }

        DEBUG_LOG_VERBOSE("CAsyncIOStream::FinishOpenCommand. Memory stream. numValidBytes = %d", numValidBytes);
        if (numValidBytes > 0) {
            m_pActiveIOBuffer = g_pMemoryIOSystem->AllocIOBuffer(-1, false);
            if (NULL == m_pActiveIOBuffer) {
                gotoErr(EFail);
            }

            m_pActiveIOBuffer->m_BufferOp = CIOBuffer::NO_OP;
            m_pActiveIOBuffer->m_BufferFlags |= CIOBuffer::VALID_DATA;
            m_pActiveIOBuffer->m_BufferFlags |= CIOBuffer::INPUT_BUFFER;
            m_pActiveIOBuffer->m_BufferFlags |= CIOBuffer::OUTPUT_BUFFER;
            m_pActiveIOBuffer->m_Err = ENoErr;

            // buffer and bufferSize are initialized by AllocIOBuffer.
            m_pActiveIOBuffer->m_pPhysicalBuffer = pBuffer;
            m_pActiveIOBuffer->m_pLogicalBuffer = m_pActiveIOBuffer->m_pPhysicalBuffer;
            m_pActiveIOBuffer->m_BufferSize = numValidBytes;
            m_pActiveIOBuffer->m_NumValidBytes = numValidBytes;

            m_pActiveIOBuffer->m_PosInMedia = 0;

            // The buffer was AddRef'ed by AllocIOBuffer.
            // Keep m_IOBufferList in LRU order. New buffers are the most recently
            // used, so they are added to the head of the list.
            m_IOBufferList.InsertHead(&(m_pActiveIOBuffer->m_StreamBufferList));

            m_pFirstValidByte = m_pActiveIOBuffer->m_pLogicalBuffer;
            m_pNextValidByte = m_pFirstValidByte;
            m_pEndValidBytes = m_pActiveIOBuffer->m_pLogicalBuffer + numValidBytes;
            m_pLastPossibleValidByte = m_pActiveIOBuffer->m_pPhysicalBuffer + bufferLength;

            m_TotalAvailableBytes = numValidBytes;
        } // (numValidBytes > 0)
    } // (CParsedUrl::URL_SCHEME_MEMORY == m_pUrl->m_Scheme)

abort:
    returnErr(err);
} // FinishOpenCommand.






/////////////////////////////////////////////////////////////////////////////
//
// [Close]
//
/////////////////////////////////////////////////////////////////////////////
void
CAsyncIOStream::Close() {
    CIOBuffer *pBuffer;
    CAsyncBlockIO *pBlockIO = NULL;
    RunChecksOnce();

    DEBUG_LOG_VERBOSE("CAsyncIOStream::Close. stream = %p", this);

    { /////////////////////////////////////////////////
        AutoLock(m_pLock);

        // Remove all buffers.
        m_pActiveIOBuffer = NULL;
        m_pFirstValidByte = NULL;
        m_pNextValidByte = NULL;
        m_pEndValidBytes = NULL;
        m_pLastPossibleValidByte = NULL;

        m_AsyncIOStreamFlags = 0;
        m_MaxNumIOBuffersForSeekableDevices = 1;
        m_AsynchLoadType = NOT_LOADING;

        m_pActiveOutputIOBuffer = NULL;
        m_pFirstValidOutputByte = NULL;
        m_pNextValidOutputByte = NULL;
        m_pLastPossibleValidOutputByte = NULL;

        // Close this outside the lock.
        pBlockIO = m_pBlockIO;
        m_pBlockIO = NULL;
        m_pIOSystem = NULL;

        while (1) {
            pBuffer = m_IOBufferList.RemoveHead();
            if (NULL == pBuffer) {
                break;
            }
            DEBUG_LOG_VERBOSE("CAsyncIOStream::Close. Release buffer %p", pBuffer);
            RELEASE_OBJECT(pBuffer);
        }
        while (1) {
            pBuffer = m_OutputBufferList.RemoveHead();
            if (NULL == pBuffer) {
                break;
            }
            DEBUG_LOG_VERBOSE("CAsyncIOStream::Close. Release buffer %p", pBuffer);
            RELEASE_OBJECT(pBuffer);
        }
        m_IOBufferList.ResetQueue();
    } /////////////////////////////////////////////////

    if (pBlockIO) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::Close. Close blockIO %p", pBlockIO);
        pBlockIO->Close();
        RELEASE_OBJECT(pBlockIO);
    }
} // Close





/////////////////////////////////////////////////////////////////////////////
//
// [GetLock]
//
/////////////////////////////////////////////////////////////////////////////
CRefLock *
CAsyncIOStream::GetLock() {
    if (m_pBlockIO) {
        ADDREF_OBJECT(m_pLock);
        return(m_pLock);
    }

    return(NULL);
} // GetLock.





/////////////////////////////////////////////////////////////////////////////
//
// [SetPosition]
//
// This moves the cursor to a block of data that covers the desired position.
//
// If this stream uses a seekable device, then this effects both the read
// and write position. If this stream has a non-seekable device, then this
// only effects the read position.
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::SetPosition(int64 newPos) {
    ErrVal err = ENoErr;
    int64 offset;
    CIOBuffer *pBuffer;
    bool fFoundBuffer;
    AutoLock(m_pLock);
    RunChecks();

    DEBUG_LOG_VERBOSE( "CAsyncIOStream::SetPosition. newPos = " INT64FMT, newPos );

    // The stream must have been initialized.
    if ((NULL == m_pBlockIO) || (newPos < 0)) {
        gotoErr(EFail);
    }

    // Check if the buffer already covers this byte position.
    // This is (hopefully) the most common case.
    if ((m_pActiveIOBuffer)
        && (m_pFirstValidByte)
        && (newPos >= m_pActiveIOBuffer->m_PosInMedia)) {
        if (newPos < (m_pActiveIOBuffer->m_PosInMedia + m_pActiveIOBuffer->m_NumValidBytes)) {
            offset = newPos - m_pActiveIOBuffer->m_PosInMedia;
            m_pNextValidByte = m_pFirstValidByte + offset;
            gotoErr(ENoErr);
        }

        // As a special case, handle seeking to the end of the file.
        if ((newPos == (m_pActiveIOBuffer->m_PosInMedia + m_pActiveIOBuffer->m_NumValidBytes))
            && (newPos >= m_TotalAvailableBytes)) {
            offset = newPos - m_pActiveIOBuffer->m_PosInMedia;
            m_pNextValidByte = m_pFirstValidByte + offset;
            gotoErr(EEOF);
        }
    } // seeking within the current buffer.

    DEBUG_LOG_VERBOSE("CAsyncIOStream::SetPosition. Need to change the current buffer");

    // Look for the buffer, it may be lying around if we are
    // seeking back in the media, or if the media is a string
    // and all data is in the buffer.
    fFoundBuffer = false;
    pBuffer = m_IOBufferList.GetHead();
    while (NULL != pBuffer) {
        if ((newPos >= pBuffer->m_PosInMedia)
            && (newPos < (pBuffer->m_PosInMedia + pBuffer->m_NumValidBytes))
            && (pBuffer->m_BufferFlags & CIOBuffer::INPUT_BUFFER)) {
            fFoundBuffer = true;
        } else if ((newPos >= pBuffer->m_PosInMedia)
            && (newPos == (pBuffer->m_PosInMedia + pBuffer->m_NumValidBytes))
            && (newPos >= m_TotalAvailableBytes)
            && (pBuffer->m_BufferFlags & CIOBuffer::INPUT_BUFFER)) {
            fFoundBuffer = true;
        }

        if (fFoundBuffer) {
            DEBUG_LOG_VERBOSE("CAsyncIOStream::SetPosition. Found a new buffer (err = %d)", pBuffer->m_Err);
            if (ENoErr != pBuffer->m_Err) {
                gotoErr(pBuffer->m_Err);
            }

            m_pActiveIOBuffer = pBuffer;
            offset = newPos - m_pActiveIOBuffer->m_PosInMedia;

            m_pFirstValidByte = m_pActiveIOBuffer->m_pLogicalBuffer;
            m_pNextValidByte = m_pFirstValidByte + offset;
            m_pEndValidBytes = m_pFirstValidByte + m_pActiveIOBuffer->m_NumValidBytes;
            m_pLastPossibleValidByte = m_pActiveIOBuffer->m_pPhysicalBuffer + m_pActiveIOBuffer->m_BufferSize;

            // Keep m_IOBufferList in LRU order. If we are touching this buffer, then
            // move it to the head of the list.
            pBuffer->m_StreamBufferList.RemoveFromQueue();
            m_IOBufferList.InsertHead(&(pBuffer->m_StreamBufferList));

            gotoErr(ENoErr);
        } // looking at an IO that has the buffer we need.

        pBuffer = pBuffer->m_StreamBufferList.GetNextInQueue();
    } // looking at all completed IO ops for the buffer we need.

    // You can always seek to byte 0 even in an empty stream.
    if ((0 == newPos) && (0 == m_TotalAvailableBytes)) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::SetPosition. Seek to the start of an empty stream");
        gotoErr(ENoErr);
    }

    if ((newPos > m_TotalAvailableBytes)
       && !(m_AsyncIOStreamFlags & EXPANDING_MEMORY_STREAM)) {
        DEBUG_LOG("CAsyncIOStream::SetPosition. Failed to seek past EOF. m_TotalAvailableBytes = " INT64FMT,
                  m_TotalAvailableBytes);
        gotoErr(EEOF);
    }
    // If the device is not seekable, then there is nowhere to read it
    // from (this will change in the future when we cache to temp files).
    // For now, this means we cannot find the block.
    if (!(m_pBlockIO->m_fSeekable)
       && !(m_AsyncIOStreamFlags & EXPANDING_MEMORY_STREAM)
       && !(m_AsyncIOStreamFlags & ALL_DATA_IS_IN_BUFFERS)) {
        DEBUG_LOG("CAsyncIOStream::SetPosition. Failed to seek past loaded buffers for a non-seekable stream");
        gotoErr(EEOF);
    }

    // We are about to change the current buffer.
    DEBUG_LOG_VERBOSE("CAsyncIOStream::SetPosition. Set the current buffer to the background");
    err = MoveBufferToBackground(m_pActiveIOBuffer);
    if (err) {
        gotoErr(err);
    }

    // We will either allocate a new block, or else recycle an existing one.
    m_pActiveIOBuffer = AllocAsyncIOStreamBuffer(newPos, true);
    if (NULL == m_pActiveIOBuffer) {
        gotoErr(EFail);
    }

    m_pFirstValidByte = m_pActiveIOBuffer->m_pLogicalBuffer;
    m_pNextValidByte = m_pActiveIOBuffer->m_pLogicalBuffer;
    m_pEndValidBytes = m_pActiveIOBuffer->m_pLogicalBuffer;
    m_pLastPossibleValidByte = m_pActiveIOBuffer->m_pPhysicalBuffer + m_pActiveIOBuffer->m_BufferSize;

    if ((m_pActiveIOBuffer->m_PosInMedia < m_TotalAvailableBytes)
       && !(m_AsyncIOStreamFlags & EXPANDING_MEMORY_STREAM)) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::SetPosition. Starting a new read");
        m_pBlockIO->ReadBlockAsync(m_pActiveIOBuffer);
    }

    // This is seeking to the end of a stream. We do this when we want to start
    // writing at the end. This is not an EOF error.
    if ((EEOF == err) && (newPos == m_TotalAvailableBytes)) {
        err = ENoErr;
    }

abort:
    returnErr(err);
} // SetPosition.







/////////////////////////////////////////////////////////////////////////////
//
// [GetPosition]
//
// This always returns the current read position. If we read and write
// in the same buffers, then this is also the write position. If we write
// and read to separate IO channels (like in a network) then there is
// no write position, since it is a non-seekable device.
/////////////////////////////////////////////////////////////////////////////
inline int64
CAsyncIOStream::GetPosition() {
    AutoLock(m_pLock);

    if ((NULL == m_pActiveIOBuffer)
        || (NULL == m_pNextValidByte)
        || (NULL == m_pFirstValidByte)) {
        return(m_TotalAvailableBytes);
    } else {
        return(m_pActiveIOBuffer->m_PosInMedia + (m_pNextValidByte - m_pFirstValidByte));
    }
} // GetPosition.






/////////////////////////////////////////////////////////////////////////////
//
// [GetDataLength]
//
/////////////////////////////////////////////////////////////////////////////
int64
CAsyncIOStream::GetDataLength() {
    int64 maxBufferPos;
    int64 eof;
    AutoLock(m_pLock);

    eof = m_TotalAvailableBytes;

    // There may be unsaved data at the end of the buffer.
    if (m_pActiveIOBuffer) {
        maxBufferPos = m_pActiveIOBuffer->m_PosInMedia
                            + m_pActiveIOBuffer->m_NumValidBytes;
        if (maxBufferPos > eof) {
            eof = maxBufferPos;
        }
    }

#if DD_DEBUG
    CIOBuffer *pBuffer;
    pBuffer = m_IOBufferList.GetHead();
    while (NULL != pBuffer) {
        maxBufferPos = pBuffer->m_PosInMedia + pBuffer->m_NumValidBytes;
        if (maxBufferPos > eof) {
            DEBUG_LOG("CAsyncIOStream::GetDataLength ERROR. maxBufferPos > eof");
            ASSERT(0);
            eof = maxBufferPos;
        }

        pBuffer = pBuffer->m_StreamBufferList.GetNextInQueue();
    } // looking at all completed IO ops for the buffer we need.
#endif

    return(eof);
} // GetDataLength.







/////////////////////////////////////////////////////////////////////////////
//
// [GetPtr]
//
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::GetPtr(
                int64 startPos,
                int32 length,
                char *pTempBuffer,
                int32 maxLength,
                char **pPtr) {
    ErrVal err = ENoErr;
    int64 offset;
    int64 stopPos;
    AutoLock(m_pLock);
    RunChecks();

    DEBUG_LOG_VERBOSE("CAsyncIOStream::GetPtr. startPos = " INT64FMT ", length = %d",
                startPos, length);

    // The stream must have been initialized.
    if ((NULL == pPtr)
        || (startPos < 0)
        || (length < 0)) {
        returnErr(EFail);
    }
    *pPtr = NULL;
    stopPos = startPos + length;

    err = SetPosition(startPos);
    if (err) {
        gotoErr(err);
    }

    // Check if the buffer contains the entire range.
    if ((m_pActiveIOBuffer)
        && (m_pFirstValidByte)
        && (stopPos < (m_pActiveIOBuffer->m_PosInMedia + m_pActiveIOBuffer->m_NumValidBytes))) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::GetPtr. Using a region in an existing buffer");
        offset = startPos - m_pActiveIOBuffer->m_PosInMedia;

        m_pNextValidByte = m_pFirstValidByte + offset;
        *pPtr = m_pNextValidByte;
    } else if (pTempBuffer) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::GetPtr. Reading the data into a temporary buffer");
        if ((int64) maxLength < (stopPos - startPos)) {
            gotoErr(EFail);
        }

        // Otherwise, read the data into the temporary buffer.
        err = Read(pTempBuffer, (int32) (stopPos - startPos));
        if (err) {
            gotoErr(err);
        }
        *pPtr = pTempBuffer;
    } else
    {
        gotoErr(EFail);
    }

abort:
    returnErr(err);
} // GetPtr.







/////////////////////////////////////////////////////////////////////////////
//
// [GetPtrRef]
//
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::GetPtrRef(
                     int64 startPos,
                     int32 length,
                     char **pPtr,
                     int32 *pActualLength) {
    ErrVal err = ENoErr;
    int64 offset;
    AutoLock(m_pLock);
    RunChecks();

    DEBUG_LOG_VERBOSE("CAsyncIOStream::GetPtrRef. startPos = " INT64FMT ", length = %d",
                startPos, length);

    // The stream must have been initialized.
    // Length < 0 means get as much as you can.
    if ((NULL == pPtr)
        || (startPos < 0)
        || (NULL == pActualLength)) {
        returnErr(EFail);
    }
    *pPtr = NULL;
    *pActualLength = 0;

    err = SetPosition(startPos);
    if (err) {
        gotoErr(err);
    }

    if ((m_pActiveIOBuffer) && (m_pFirstValidByte)) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::GetPtrRef. Using a region in an existing buffer");

        offset = startPos - m_pActiveIOBuffer->m_PosInMedia;
        *pPtr = m_pFirstValidByte + offset;
        *pActualLength = (int32) ((m_pActiveIOBuffer->m_PosInMedia + m_pActiveIOBuffer->m_NumValidBytes)
                              - startPos);

        // Length < 0 means get as much as you can.
        // Length > 0 means to clip the result to that length.
        if ((length > 0) && (*pActualLength > length)) {
            *pActualLength = length;
        }
    }

abort:
    returnErr(err);
} // GetPtrRef.









/////////////////////////////////////////////////////////////////////////////
//
// [Read]
//
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::Read(char *clientBuffer, int32 bytesToRead) {
    ErrVal err = ENoErr;
    int32 bytesInBuffer;
    int32 numBytes;
    AutoLock(m_pLock);
    RunChecks();

    DEBUG_LOG_VERBOSE("CAsyncIOStream::Read. bytesToRead = %d", bytesToRead);

    // The stream must have been initialized.
    if ((NULL == m_pBlockIO)
        || (bytesToRead < 0)
        || (NULL == clientBuffer)) {
        gotoErr(EFail);
    }

    // Each loop reads from one buffer.
    while (bytesToRead > 0) {
        // Find out how many bytes are available in this buffer.
        numBytes = bytesToRead;
        if (NULL == m_pActiveIOBuffer) {
            numBytes = 0;
        } else if ((m_pEndValidBytes)
            && (m_pNextValidByte)
            && ((m_pNextValidByte + numBytes) >= m_pEndValidBytes)) {
            numBytes = m_pEndValidBytes - m_pNextValidByte;
        }

        if (numBytes > 0) {
            memcpy(clientBuffer, m_pNextValidByte, numBytes);

            clientBuffer += numBytes;
            if (m_pNextValidByte) {
                m_pNextValidByte += numBytes;
            }
            bytesToRead = bytesToRead - numBytes;
        }

        // If we did not read all of the bytes we need, then we must
        // have hit the end of the buffer. In that case, read the
        // next buffer-full of data.
        if (bytesToRead > 0) {
            int64 currentPosition = 0;

            if (NULL != m_pActiveIOBuffer) {
                currentPosition = m_pActiveIOBuffer->m_PosInMedia;
            }

            if ((m_pEndValidBytes) && (m_pFirstValidByte)) {
                bytesInBuffer = m_pEndValidBytes - m_pFirstValidByte;
            } else {
                bytesInBuffer = 0;
            }

            DEBUG_LOG_VERBOSE("CAsyncIOStream::Read. Call SetPosition for new position" INT64FMT,
                        currentPosition + bytesInBuffer);

            err = SetPosition(currentPosition + bytesInBuffer);
            if (err) {
                gotoErr(err);
            }

            // Be careful.
            // You can always seek to 0 in an empty stream, but you cannot read.
            if (0 == m_TotalAvailableBytes) {
                DEBUG_LOG_VERBOSE("CAsyncIOStream::SetPosition. Read an empty stream");
                gotoErr(EEOF);
            }
        } // reading the next buffer of data.
    } // while (bytesToRead > 0)

abort:
    returnErr(err);
} // Read





/////////////////////////////////////////////////////////////////////////////
//
// [Write]
//
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::Write(const char *clientBuffer, int32 bytesToWrite) {
    ErrVal err = ENoErr;
    AutoLock(m_pLock);
    RunChecks();

    DEBUG_LOG_VERBOSE("CAsyncIOStream::Write. bytesToWrite = %d",
                bytesToWrite);

    // The stream must have been initialized.
    if ((NULL == m_pBlockIO)
        || (bytesToWrite < 0)
        || (!clientBuffer)) {
        gotoErr(EFail);
    }

    // This happens when a stream has closed.
    if (NULL == m_pIOSystem) {
        gotoErr(EEOF);
    }

    if (m_pBlockIO->m_fSeekable) {
        err = WriteToSeekableDevice(clientBuffer, bytesToWrite);
    } else
    {
        err = WriteToStreamDevice(clientBuffer, bytesToWrite);
    }

abort:
    returnErr(err);
} // Write.







/////////////////////////////////////////////////////////////////////////////
//
// [WriteToSeekableDevice]
//
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::WriteToSeekableDevice(const char *clientBuffer, int32 bytesToWrite) {
    ErrVal err = ENoErr;
    int64 writePos;
    int32 bytesCopiedToCurrentBuffer;
    int32 numFreeBytes;


    if ((m_pActiveIOBuffer)
        && (m_pNextValidByte)
        && (m_pFirstValidByte)) {
        writePos = m_pActiveIOBuffer->m_PosInMedia + (m_pNextValidByte - m_pFirstValidByte);
    } else
    {
        writePos = 0;
    }

    DEBUG_LOG_VERBOSE("CAsyncIOStream::WriteToSeekableDevice. bytesToWrite = %d, writePos =" INT64FMT,
                bytesToWrite, writePos);

    // Each iteration of this loop writes some data.
    while (bytesToWrite > 0) {
        // If there is no current buffer, or we are at the end of
        // a buffer, then allocate a new empty buffer.
        if ((NULL == m_pFirstValidByte)
            || ((m_pNextValidByte)
                && (m_pLastPossibleValidByte)
                && (m_pNextValidByte >= m_pLastPossibleValidByte))) {
            // We are writing into the middle of the stream or at the end of the stream.
            // First, handle the case of writing into the middle of the stream.
            if ((writePos < GetDataLength())
                  && !(m_AsyncIOStreamFlags & EXPANDING_MEMORY_STREAM)) {
                err = SetPosition(writePos);
                if (err) {
                    gotoErr(err);
                }
            }
            // Now, handle the case of writing to the end of the stream.
            else {
                DEBUG_LOG_VERBOSE("CAsyncIOStream::WriteToSeekableDevice. Add a new buffer to end of stream. writePos =" INT64FMT,
                            writePos);

                // Save the current buffer.
                err = MoveBufferToBackground(m_pActiveIOBuffer);
                if (err) {
                    gotoErr(err);
                }

                // Allocate a new empty buffer.
                m_pActiveIOBuffer = AllocAsyncIOStreamBuffer(writePos, true);
                if (!m_pActiveIOBuffer) {
                     gotoErr(EFail);
                }

                // This will be adjusted for block alignment.
                writePos = m_pActiveIOBuffer->m_PosInMedia;

                m_pFirstValidByte = m_pActiveIOBuffer->m_pLogicalBuffer;
                m_pNextValidByte = m_pActiveIOBuffer->m_pLogicalBuffer;
                m_pEndValidBytes = m_pActiveIOBuffer->m_pLogicalBuffer;
                m_pLastPossibleValidByte = m_pActiveIOBuffer->m_pLogicalBuffer + m_pActiveIOBuffer->m_BufferSize;
            }
        } // allocating an empty buffer.

        // Now that we know this is the buffer we will be writing to,
        // record that we have written to it.
        m_pActiveIOBuffer->m_BufferFlags |= CIOBuffer::UNSAVED_CHANGES;
        m_pActiveIOBuffer->m_BufferFlags |= CIOBuffer::VALID_DATA;

        if ((m_pLastPossibleValidByte) && (m_pNextValidByte)) {
            numFreeBytes = m_pLastPossibleValidByte - m_pNextValidByte;
        } else {
            numFreeBytes = 0;
        }

        if ((numFreeBytes <= 0)
            || (NULL == m_pNextValidByte)
            || (NULL == m_pEndValidBytes)) {
            gotoErr(EFail);
        }

        bytesCopiedToCurrentBuffer = bytesToWrite;
        if (numFreeBytes < bytesCopiedToCurrentBuffer) {
            bytesCopiedToCurrentBuffer = numFreeBytes;
        }

        // Now, at this point we know there is enough room in the buffer
        // for the value. Copy the value to the buffer. We save it in
        // the buffer in the same format as it will appear in the file.
        memcpy(m_pNextValidByte, clientBuffer, bytesCopiedToCurrentBuffer);

        // The next value goes just after the value we just wrote.
        clientBuffer += bytesCopiedToCurrentBuffer;
        bytesToWrite = bytesToWrite - bytesCopiedToCurrentBuffer;
        writePos += bytesCopiedToCurrentBuffer;
        m_pNextValidByte += bytesCopiedToCurrentBuffer;

        // The data we just wrote is also readable.
        if (m_pNextValidByte > m_pEndValidBytes) {
            m_pEndValidBytes = m_pNextValidByte;
        }

        // Record the changes to the buffer.
        if (m_pActiveIOBuffer) {
            m_pActiveIOBuffer->m_NumValidBytes = m_pEndValidBytes - m_pFirstValidByte;
            if ((m_pActiveIOBuffer->m_PosInMedia + m_pActiveIOBuffer->m_NumValidBytes)
                     > m_TotalAvailableBytes) {
                  m_TotalAvailableBytes = m_pActiveIOBuffer->m_PosInMedia + m_pActiveIOBuffer->m_NumValidBytes;
            }
        }
    } // while (bytesToWrite > 0)

abort:
    returnErr(err);
} // WriteToSeekableDevice.







/////////////////////////////////////////////////////////////////////////////
//
// [WriteToStreamDevice]
//
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::WriteToStreamDevice(const char *clientBuffer, int32 bytesToWrite) {
    ErrVal err = ENoErr;
    int32 bytesCopiedToCurrentBuffer;

    DEBUG_LOG_VERBOSE("CAsyncIOStream::WriteToStreamDevice. bytesToWrite = %d", bytesToWrite);

    // Each iteration of this loop writes some data.
    while (bytesToWrite > 0) {
        // If there is no current buffer, or we are at the end of
        // a buffer, then allocate a new empty buffer.
        if ((!m_pFirstValidOutputByte)
            || ((m_pNextValidOutputByte)
                && (m_pLastPossibleValidOutputByte)
                && (m_pNextValidOutputByte >= m_pLastPossibleValidOutputByte))) {
            DEBUG_LOG_VERBOSE("CAsyncIOStream::WriteToStreamDevice. Add a new buffer to end of stream.");

            // Save the current buffer.
            err = MoveBufferToBackground(m_pActiveOutputIOBuffer);
            if (err) {
                gotoErr(err);
            }

            // Allocate a new empty buffer. It doesn't matter what
            // the position in the media is for a write to a PIPE.
            m_pActiveOutputIOBuffer = AllocAsyncIOStreamBuffer(0, false);
            if (!m_pActiveOutputIOBuffer) {
                gotoErr(EFail);
            }

            m_pFirstValidOutputByte = m_pActiveOutputIOBuffer->m_pLogicalBuffer;
            m_pNextValidOutputByte = m_pActiveOutputIOBuffer->m_pLogicalBuffer;
            m_pLastPossibleValidOutputByte = m_pActiveOutputIOBuffer->m_pPhysicalBuffer + m_pActiveOutputIOBuffer->m_BufferSize;
        } // allocating an empty buffer.

        // Now that we know this is the buffer we will be writing to,
        // record that we have written to it.
        m_pActiveOutputIOBuffer->m_BufferFlags |= CIOBuffer::UNSAVED_CHANGES;
        m_pActiveOutputIOBuffer->m_BufferFlags |= CIOBuffer::VALID_DATA;

        if ((m_pLastPossibleValidOutputByte)
            && (m_pNextValidOutputByte)) {
            bytesCopiedToCurrentBuffer = m_pLastPossibleValidOutputByte - m_pNextValidOutputByte;
        } else {
            bytesCopiedToCurrentBuffer = 0;
        }
        if ((bytesCopiedToCurrentBuffer <= 0) || (!m_pNextValidOutputByte)) {
            gotoErr(EFail);
        }
        if (bytesCopiedToCurrentBuffer >= bytesToWrite) {
            bytesCopiedToCurrentBuffer = bytesToWrite;
        }

        // Now, at this point we know there is enough room in the buffer
        // for the value. Copy the value to the buffer. We save it in
        // the buffer in the same format as it will appear in the file.
        memcpy(m_pNextValidOutputByte, clientBuffer, bytesCopiedToCurrentBuffer);

        // The next value goes just after the value we just wrote.
        clientBuffer += bytesCopiedToCurrentBuffer;
        m_pNextValidOutputByte += bytesCopiedToCurrentBuffer;
        bytesToWrite = bytesToWrite - bytesCopiedToCurrentBuffer;
        m_pActiveOutputIOBuffer->m_NumValidBytes = m_pNextValidOutputByte - m_pFirstValidOutputByte;
    } // while (bytesToWrite > 0)

abort:
    returnErr(err);
} // WriteToStreamDevice.








/////////////////////////////////////////////////////////////////////////////
//
// [RemoveNBytes]
//
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::RemoveNBytes(int64 startPos, int32 bytesToRemove) {
    ErrVal err = ENoErr;
    CIOBuffer *pBuffer;
    CIOBuffer *pNewBuffer = NULL;
    CIOBuffer *pNextBufferToCheck;
    int32 bytesToRemoveFromThisBuffer;
    int32 offset;
    int32 bytesInNewBuffer;
    AutoLock(m_pLock);
    RunChecks();

    DEBUG_LOG_VERBOSE("CAsyncIOStream::RemoveNBytes. startPos = " INT64FMT ", bytesToRemove = %d",
                startPos, bytesToRemove);

    // The stream must have been initialized.
    if ((NULL == m_pBlockIO)
        || (startPos < 0)
        || (bytesToRemove < 0)) {
        gotoErr(EFail);
    }

    err = MoveBufferToBackground(m_pActiveIOBuffer);
    if (err) {
        gotoErr(err);
    }

    pBuffer = m_IOBufferList.GetHead();
    while (pBuffer) {
        // We may delete this buffer, so get the next buffer now,
        // while the current buffer is still valid.
        pNextBufferToCheck = pBuffer->m_StreamBufferList.GetNextInQueue();

        // Adjust any buffers effected by this. There are several possible
        // different cases, depending on how the burrer overlaps with the
        // region we are removing.
        //
        // pBuffer:         [xxxxxxxxxxxxxxxxxxxxxxxxx]
        // Case 1: [.....]
        // Case 2: [....................]
        // Case 3: [.......................................]
        // Case 4:                [..............]
        // Case 5:                [........................]
        //
        if ((pBuffer->m_BufferFlags & CIOBuffer::INPUT_BUFFER)
            // Don't adjust a split buffer that we just created.
            && (pNewBuffer != pBuffer)
            && (startPos < (pBuffer->m_PosInMedia + pBuffer->m_NumValidBytes))) {
            ////////////////////////////////
            // Case 1.
            // Check if the range occurs entirely before the beginning
            // of the buffer. This is an easy case, just change the start position
            // of this buffer.
            if ((startPos + bytesToRemove) <= pBuffer->m_PosInMedia) {
                pBuffer->m_PosInMedia = pBuffer->m_PosInMedia - bytesToRemove;
            }
            ////////////////////////////////
            // Case 3 and Case 5.
            // Otherwise, check if the range partly overlaps the end of the buffer.
            // This is usually an easy case, just shorten the buffer. It may
            // span the entire buffer.
            else if ((startPos + bytesToRemove)
                        >= (pBuffer->m_PosInMedia + pBuffer->m_NumValidBytes)) {
                bytesToRemoveFromThisBuffer
                    = (int32) ((pBuffer->m_PosInMedia + pBuffer->m_NumValidBytes) - startPos);

                pBuffer->m_NumValidBytes = pBuffer->m_NumValidBytes - bytesToRemoveFromThisBuffer;
                // It may be 0 if the removed bytes cover this entire buffer and
                // part of at least one adjacent buffer.
                if (pBuffer->m_NumValidBytes <= 0) {
                    pBuffer->m_StreamBufferList.RemoveFromQueue();
                    // If this is waiting on an asynch operation, then
                    // let the operation finish before we close the file.
                    if ((CIOBuffer::READ == pBuffer->m_BufferOp)
                        || (CIOBuffer::WRITE == pBuffer->m_BufferOp))
                    {
                        pBuffer->m_BufferFlags |= CIOBuffer::DISCARD_WHEN_IDLE;
                    }
                    else
                    {
                        // Discard this block.
                        RELEASE_OBJECT(pBuffer);
                    }
                }
            } // The range overlaps the end of the buffer.
            ////////////////////////////////
            // Case 2.
            // Otherwise, check if the range overlaps the beginning of the buffer.
            // This is an easy case, just trim from the front of the buffer.
            else if (startPos <= pBuffer->m_PosInMedia) {
                bytesToRemoveFromThisBuffer
                    = (int32) ((startPos + bytesToRemove) - pBuffer->m_PosInMedia);

                pBuffer->m_NumValidBytes = pBuffer->m_NumValidBytes
                    - bytesToRemoveFromThisBuffer;

                pBuffer->m_PosInMedia = startPos;

                pBuffer->m_pLogicalBuffer += bytesToRemoveFromThisBuffer;
            } // Remove data from the front of the buffer.

            ////////////////////////////////
            // Case 4.
            // Otherwise, we have the worst case. Bytes are removed from
            // the middle of a buffer.
            else {
                bytesInNewBuffer
                    = (int32) ((pBuffer->m_PosInMedia + pBuffer->m_NumValidBytes)
                                    - (startPos + bytesToRemove));
                offset = (int32) ((startPos + bytesToRemove) - pBuffer->m_PosInMedia);

                // Allocate a new empty IO buffer for the data after the range.
                pNewBuffer = AllocAsyncIOStreamBuffer(bytesInNewBuffer, true);
                if (!pNewBuffer) {
                    gotoErr(EFail);
                }
                pNewBuffer->m_PosInMedia = startPos;
                pNewBuffer->m_NumValidBytes = bytesInNewBuffer;

                memcpy(
                    pNewBuffer->m_pLogicalBuffer,
                    pBuffer->m_pLogicalBuffer + offset,
                    pNewBuffer->m_NumValidBytes);

                // Now, remove the data from the first buffer.
                pBuffer->m_NumValidBytes = (int32) (startPos - pBuffer->m_PosInMedia);
            } // Remove data from the middle of the buffer.
        } // // Adjust any buffers effected by this.

        pBuffer = pNextBufferToCheck;
    } // Updating all buffers.


    m_pActiveIOBuffer = NULL;
    m_pFirstValidByte = NULL;
    m_pNextValidByte = NULL;
    m_pEndValidBytes = NULL;
    m_pLastPossibleValidByte = NULL;

    m_TotalAvailableBytes = m_TotalAvailableBytes - bytesToRemove;

    m_NextLoadStartPosition    = m_NextLoadStartPosition - bytesToRemove;
    m_NextAsynchBufferPosition = m_NextAsynchBufferPosition - bytesToRemove;

    if (m_pBlockIO) {
        err = m_pBlockIO->RemoveNBytes(bytesToRemove);
        if (err) {
            gotoErr(err);
        }
    }

abort:
    returnErr(err);
} // RemoveNBytes.









/////////////////////////////////////////////////////////////////////////////
//
// [CopyStream]
//
// This writes without copying. The reader (the source of the
// copy) retains the blocks, so they do not appear in the cache
// of the writer.
//
// Read may return an error if the server suddenly disconnects
// due to a crash.
//
// Write may return an error, if the remote client closes the
// connection. That happens when we return an error response
// or an old object that the client already has in cache.
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::CopyStream(
                  CAsyncIOStream *destStream,
                  int64 numBytesToCopy,
                  bool fTransferOwnerShip) {
    ErrVal err = ENoErr;
    ErrVal err2 = ENoErr;
    int64 srcStartPos;
    int32 numBytes;
    CIOBuffer *pBuffer;
    int32 startOffsetInBuffer;
    RunChecks();

    DEBUG_LOG_VERBOSE("CAsyncIOStream::CopyStream. numBytesToCopy = " INT64FMT ", fTransferOwnerShip = %d",
                numBytesToCopy, fTransferOwnerShip);

    if ((!destStream)
        || !(destStream->m_pLock)
        || !(destStream->m_pBlockIO)
        || (numBytesToCopy < 0)) {
        returnErr(EFail);
    }

    m_pLock->Lock();
    destStream->m_pLock->Lock();

    // Save where we left off so we can restore our position.
    srcStartPos = GetPosition();

    DEBUG_LOG_VERBOSE("CAsyncIOStream::CopyStream. srcStartPos = " INT64FMT, srcStartPos);

    // The user can specify the number of bytes to copy, or else
    // we just copy from here to the end of the stream.
    if (CAsyncIOStream::COPY_TO_EOF == numBytesToCopy) {
        numBytesToCopy = GetDataLength() - srcStartPos;
    }
    DEBUG_LOG_VERBOSE("CAsyncIOStream::CopyStream. numBytesToCopy = %d", numBytesToCopy);

    if (fTransferOwnerShip) {
        // Write any unsaved output buffer.
        err = MoveBufferToBackground(m_pActiveIOBuffer);
        if (err) {
            gotoErr(err);
        }
        err = destStream->MoveBufferToBackground(destStream->m_pActiveIOBuffer);
        if (err) {
            gotoErr(err);
        }
        err = MoveBufferToBackground(m_pActiveOutputIOBuffer);
        if (err) {
            gotoErr(err);
        }
        err = destStream->MoveBufferToBackground(destStream->m_pActiveOutputIOBuffer);
        if (err) {
            gotoErr(err);
        }

        // We are writing data without buffering, so
        // remove any previously accumulated buffers.
        destStream->m_pActiveIOBuffer = NULL;
        destStream->m_pFirstValidByte = NULL;
        destStream->m_pNextValidByte = NULL;
        destStream->m_pEndValidBytes = NULL;
        destStream->m_pLastPossibleValidByte = NULL;

        destStream->m_pActiveOutputIOBuffer = NULL;
        destStream->m_pFirstValidOutputByte = NULL;
        destStream->m_pNextValidOutputByte = NULL;
        destStream->m_pLastPossibleValidOutputByte = NULL;
        destStream->m_TotalAvailableBytes = 0;
    }

    // Each loop transfers from one buffer.
    while (numBytesToCopy > 0) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::CopyStream. srcStartPos = " INT64FMT, srcStartPos);
        err = SetPosition(srcStartPos);
        if (err) {
            if (EEOF == err) {
                err = ENoErr;
            }
            gotoErr(err);
        }

        pBuffer = m_pActiveIOBuffer;
        if (fTransferOwnerShip) {
            // Remove the buffer from the source stream.
            m_IOBufferList.RemoveFromQueue(&(pBuffer->m_StreamBufferList));
            m_pActiveIOBuffer = NULL;
            m_pFirstValidByte = NULL;
            m_pNextValidByte = NULL;
            m_pEndValidBytes = NULL;
            m_pLastPossibleValidByte = NULL;
            m_pActiveOutputIOBuffer = NULL;
            m_pFirstValidOutputByte = NULL;
            m_pNextValidOutputByte = NULL;
            m_pLastPossibleValidOutputByte = NULL;

            // Insert the buffer into this stream. We own the buffer now.
            destStream->m_IOBufferList.InsertHead(&(pBuffer->m_StreamBufferList));

            // Find out how many bytes are available in this buffer.
            startOffsetInBuffer = (int32) (srcStartPos - pBuffer->m_PosInMedia);
            pBuffer->m_BufferFlags |= CIOBuffer::VALID_DATA;
            pBuffer->m_BufferFlags |= CIOBuffer::OUTPUT_BUFFER;

            destStream->m_pBlockIO->WriteBlockAsync(pBuffer, startOffsetInBuffer);
        } // if (fTransferOwnerShip)
        else {
            if ((m_pEndValidBytes) && (m_pNextValidByte)) {
                numBytes = m_pEndValidBytes - m_pNextValidByte;
            } else {
                break;
            }
            if (numBytes > numBytesToCopy) {
                numBytes = (int32) numBytesToCopy;
            }

            DEBUG_LOG_VERBOSE("CAsyncIOStream::CopyStream. Writing bytes, numBytes = %d", numBytes);
            err = destStream->Write(m_pNextValidByte, numBytes);
            if (err) {
                gotoErr(err);
            }

            if (m_pNextValidByte) {
                m_pNextValidByte += numBytes;
            }
        } // if (!fTransferOwnerShip)

        srcStartPos += numBytes;
        numBytesToCopy = numBytesToCopy - numBytes;
    } // while (numBytesToCopy > 0)

abort:
    // Restore the position in the stream.
    DEBUG_LOG_VERBOSE("CAsyncIOStream::CopyStream. srcStartPos = " INT64FMT, srcStartPos);
    err2 = SetPosition(srcStartPos);
    if ((!err) && (err2)) {
        if (EEOF == err2) {
            err2 = ENoErr;
        }
        err = err2;
    }

    m_pLock->Unlock();
    destStream->m_pLock->Unlock();

    returnErr(err);
} // CopyStream.







/////////////////////////////////////////////////////////////////////////////
//
// [ListenForNBytes]
//
/////////////////////////////////////////////////////////////////////////////
void
CAsyncIOStream::ListenForNBytes(int64 startPos, int64 nBytes) {
    ErrVal err = ENoErr;
    int64 bytesAvalilableNow;
    AutoLock(m_pLock);
    RunChecks();

    DEBUG_LOG_VERBOSE("CAsyncIOStream::ListenForNBytes. startPos = " INT64FMT ", nBytes = " INT64FMT,
                startPos, nBytes);

    // The stream must have been initialized.
    if ((NULL == m_pBlockIO) || (nBytes < 0)) {
        gotoErr(EFail);
    }
    if (startPos < 0) {
        startPos = m_NextLoadStartPosition;
        DEBUG_LOG_VERBOSE("CAsyncIOStream::ListenForNBytes. startPos = " INT64FMT ", nBytes = " INT64FMT,
                    startPos, nBytes);
    }

    // If this stream is in the middle of an asynch op, then we cannot
    // start a new one.
    if (NOT_LOADING != m_AsynchLoadType) {
        gotoErr(EFail);
    }

    m_AsynchLoadType = ASYNCH_LOAD_NBYTES;
    m_LoadStartPosition = startPos;
    m_LoadStopPosition = startPos + nBytes;
    DEBUG_LOG_VERBOSE("CAsyncIOStream::ListenForNBytes. m_LoadStopPosition = " INT64FMT,
                m_LoadStopPosition);

    if ((m_AsyncIOStreamFlags & ALL_DATA_IS_IN_BUFFERS)
       || (m_AsyncIOStreamFlags & EXPANDING_MEMORY_STREAM)) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::ListenForNBytes. ALL_DATA_IS_IN_BUFFERS");
        FinishAsyncLoad(ENoErr);
        goto abort;
    }

    // If we have all the data we need, then we are done.
    bytesAvalilableNow = GetDataLength();
    if (bytesAvalilableNow >= m_LoadStopPosition) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::ListenForNBytes. All bytes are already available. bytesAvalilableNow = " INT64FMT,
                    bytesAvalilableNow);
        FinishAsyncLoad(ENoErr);
        goto abort;
    }

    ContinueAsyncLoad(ENoErr, false);

abort:
    if (err) {
        FinishAsyncLoad(err);
    }
} // ListenForNBytes.







/////////////////////////////////////////////////////////////////////////////
//
// [ListenForMoreBytes]
//
/////////////////////////////////////////////////////////////////////////////
void
CAsyncIOStream::ListenForMoreBytes() {
    ErrVal err = ENoErr;
    int64 bytesAvalilableNow;
    AutoLock(m_pLock);
    RunChecks();

    DEBUG_LOG_VERBOSE("CAsyncIOStream::ListenForMoreBytes");

    // The stream must have been initialized.
    if (NULL == m_pBlockIO) {
        gotoErr(EFail);
    }
    // If this stream is in the middle of an asynch op, then don't
    // start a new one. This is not an error case.
    // The KeepAlives may call this twice, once when the KeepAlive
    // is set, and again when a read with keepAlive is completed. Neither
    // can assume the other will always run, so both must run to be safe.
    // So, don't freak out if the other has already started the next read.
    if (ASYNCH_LOAD_ANY_BYTES == m_AsynchLoadType) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::ListenForMoreBytes. ASYNCH_LOAD_ANY_BYTES");
        gotoErr(ENoErr);
    } else if ((ASYNCH_LOAD_ANY_BYTES != m_AsynchLoadType)
            && (NOT_LOADING != m_AsynchLoadType)) {
        gotoErr(EFail);
    }

    m_AsynchLoadType = ASYNCH_LOAD_ANY_BYTES;
    m_LoadStartPosition = m_NextLoadStartPosition;
    m_LoadStopPosition = -1;

    if ((m_AsyncIOStreamFlags & ALL_DATA_IS_IN_BUFFERS)
       || (m_AsyncIOStreamFlags & EXPANDING_MEMORY_STREAM)) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::ListenForNBytes. ALL_DATA_IS_IN_BUFFERS");
        FinishAsyncLoad(ENoErr);
        goto abort;
    }

    bytesAvalilableNow = GetDataLength();
    if (bytesAvalilableNow > m_LoadStartPosition) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::ListenForMoreBytes. All bytes are already available. bytesAvalilableNow = " INT64FMT,
                    bytesAvalilableNow);
        FinishAsyncLoad(ENoErr);
    } else {
        ContinueAsyncLoad(ENoErr, false);
    }

abort:
    if (err) {
        FinishAsyncLoad(err);
    }
} // ListenForMoreBytes.






/////////////////////////////////////////////////////////////////////////////
//
// [ListenForAllBytesToEOF]
//
/////////////////////////////////////////////////////////////////////////////
void
CAsyncIOStream::ListenForAllBytesToEOF() {
    ErrVal err = ENoErr;
    AutoLock(m_pLock);
    RunChecks();

    DEBUG_LOG_VERBOSE("CAsyncIOStream::ListenForAllBytesToEOF");

    if (NULL == m_pBlockIO) {
        gotoErr(EFail);
    }
    // If this stream is in the middle of an asynch op, then we cannot
    // start a new one.
    if (NOT_LOADING != m_AsynchLoadType) {
        gotoErr(EFail);
    }

    m_AsynchLoadType = ASYNCH_LOAD_TO_EOF;
    m_LoadStartPosition = m_NextLoadStartPosition;
    m_LoadStopPosition = -1;

    if ((m_AsyncIOStreamFlags & ALL_DATA_IS_IN_BUFFERS)
       || (m_AsyncIOStreamFlags & EXPANDING_MEMORY_STREAM)) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::ListenForAllBytesToEOF. ALL_DATA_IS_IN_BUFFERS");
        FinishAsyncLoad(ENoErr);
        goto abort;
    }

    ContinueAsyncLoad(ENoErr, false);

abort:
    if (err) {
        FinishAsyncLoad(err);
    }
} // ListenForAllBytesToEOF.







/////////////////////////////////////////////////////////////////////////////
//
// [ContinueAsyncLoad]
//
// This method cannot hold the lock while it calls a callback, since that may
// cause a deadlock. The AsyncIOStream object and the callback object may have
// separate locks. The callback object may run at any time, such as in response
// to a user action or a timer, or an I/O event on another data stream. This
// method and a few others are indirectly called by the selectThread, so they
// start a new call-chain. The callback object must be able to hold its lock while
// calling the AsyncIOStream, so the AsyncIOStream cannot hold its lock while calling
// the callback.
//
// This only effects OnBlockIOEvent, ContinueAsyncLoad and FinishAsyncLoad
// since these are the only methods that are called indirectly
// by the select thread.
/////////////////////////////////////////////////////////////////////////////
void
CAsyncIOStream::ContinueAsyncLoad(ErrVal resultErr, bool fReceivedNewData) {
    ErrVal err = ENoErr;
    int64 numAvailBytes;
    CIOBuffer *pBuffer = NULL;
    ErrVal finishAsyncLoadErr = ENoErr;
    bool fCallFinishAsyncLoad = false;

    DEBUG_LOG_VERBOSE("CAsyncIOStream::ContinueAsyncLoad: resultErr = %d, fReceivedNewData = %d",
            resultErr, fReceivedNewData);

    ////////////////////////////////////////////////
    {
        AutoLock(m_pLock);

        // Run any standard debugger checks.
        RunChecks();

        // The stream must have been initialized.
        if ((NULL == m_pBlockIO) || (NOT_LOADING == m_AsynchLoadType)) {
            gotoErr(EFail);
        }
        if (resultErr) {
            fCallFinishAsyncLoad = true;
            finishAsyncLoadErr = resultErr;
            fReceivedNewData = false;
            goto finishOutsideCritSec;
        }

        numAvailBytes = GetDataLength();
        DEBUG_LOG_VERBOSE("CAsyncIOStream::ContinueAsyncLoad: numAvailBytes = %d", numAvailBytes);
        DEBUG_LOG_VERBOSE("CAsyncIOStream::ContinueAsyncLoad: m_LoadStartPosition = %d", m_LoadStartPosition);
        DEBUG_LOG_VERBOSE("CAsyncIOStream::ContinueAsyncLoad: m_AsynchLoadType = %d", m_AsynchLoadType);

        if (((ASYNCH_LOAD_ANY_BYTES == m_AsynchLoadType)
                && (numAvailBytes > m_LoadStartPosition))
            || ((ASYNCH_LOAD_NBYTES == m_AsynchLoadType)
                && (numAvailBytes >= m_LoadStopPosition))) {
            DEBUG_LOG_VERBOSE("CAsyncIOStream::ContinueAsyncLoad: Call FinishAsyncLoad");

            fCallFinishAsyncLoad = true;
            finishAsyncLoadErr = ENoErr;
            fReceivedNewData = true;
            goto finishOutsideCritSec;
        }

        // This check is important. If we read a partial block from a file
        // then the next read may be rounded down to a previous block.
        // As a result, we could read the last block several times.
        // This check avoids that.
        if ((m_pBlockIO->m_fSeekable)
            && (numAvailBytes >= m_pBlockIO->GetMediaSize())) {
            DEBUG_LOG_VERBOSE("CAsyncIOStream::ContinueAsyncLoad: Call FinishAsyncLoad");

            fCallFinishAsyncLoad = true;
            finishAsyncLoadErr = ENoErr;
            fReceivedNewData = true;
            goto finishOutsideCritSec;
        }

        // At this point, we have decided that we need more data.
        DEBUG_LOG_VERBOSE("CAsyncIOStream::ContinueAsyncLoad: we need more data");

        // If this is a stream (non-seekable) blockIO, then buffers will
        // be allocated by the blockIO when they arrive. If this
        // is a seekable, however, then we need to request the read.
        if (m_pBlockIO->m_fSeekable) {
            // Allocate a new empty IO buffer.
            pBuffer = AllocAsyncIOStreamBuffer(numAvailBytes, true);
            if (!pBuffer) {
                gotoErr(EFail);
            }
            pBuffer->m_PosInMedia = m_NextAsynchBufferPosition;

            // Now, fill the new buffer. We always do this if we are
            // reading, or if we are either reading or writing and
            // there is data in the backing store.
            DEBUG_LOG_VERBOSE("CAsyncIOStream::ContinueAsyncLoad: Call ReadBlockAsync. pos = %d", m_NextAsynchBufferPosition);
            m_pBlockIO->ReadBlockAsync(pBuffer);
        }
        // Otherwise, if this is a stream (non-seekable) blockIO, then
        // new data may never arrive. Start a timeout, which will trigger
        // an I/O error if data does not arrive in a reasonable time.
        else { // if (!(m_pBlockIO->m_fSeekable))
            DEBUG_LOG_VERBOSE("CAsyncIOStream::ContinueAsyncLoad: Call StartTimeout");
            m_pBlockIO->StartTimeout(CIOBuffer::READ);
        }
    } ////////////////////////////////////////////////

finishOutsideCritSec:
    if (fCallFinishAsyncLoad) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::ContinueAsyncLoad: Calling FinishAsyncLoad");
        FinishAsyncLoad(finishAsyncLoadErr);
    }
    return;

abort:
    if (pBuffer) {
        pBuffer->m_StreamBufferList.RemoveFromQueue();
        RELEASE_OBJECT(pBuffer);
    }

    // This is actually a success case.
    if ((EEOF == err)
        && ((ASYNCH_LOAD_TO_EOF == m_AsynchLoadType)
            || (ASYNCH_LOAD_ANY_BYTES == m_AsynchLoadType))) {
        err = ENoErr;
    }

    FinishAsyncLoad(err);
} // ContinueAsyncLoad.







/////////////////////////////////////////////////////////////////////////////
//
// [FinishAsyncLoad]
//
// This method cannot hold the lock while it calls a callback, since that may
// cause a deadlock. The AsyncIOStream object and the callback object may have
// separate locks. The callback object may run at any time, such as in response
// to a user action or a timer, or an I/O event on another data stream. This
// method and a few others are indirectly called by the selectThread, so they
// start a new call-chain. The callback object must be able to hold its lock while
// calling the AsyncIOStream, so the AsyncIOStream cannot hold its lock while calling
// the callback.
//
// This only effects OnBlockIOEvent, ContinueAsyncLoad and FinishAsyncLoad
// since these are the only methods that are called by the select thread.
/////////////////////////////////////////////////////////////////////////////
void
CAsyncIOStream::FinishAsyncLoad(ErrVal resultErr) {
    int64 totalAvailBytes;
    int32 oldAsyncLoadType;
    CAsyncIOEventHandler *pEventHandler = NULL;

    DEBUG_LOG_VERBOSE("CAsyncIOStream::FinishAsyncLoad. resultErr= %d", resultErr);

    ///////////////////////////////////////////////////////
    {
        AutoLock(m_pLock);

        // Run any standard debugger checks.
        RunChecks();

        totalAvailBytes = GetDataLength();

        DEBUG_LOG_VERBOSE("CAsyncIOStream::FinishAsyncLoad. totalAvailBytes = %d, m_AsynchLoadType = %d",
                    totalAvailBytes, m_AsynchLoadType);

        // Update the state before calling the callback; the callback may
        // re-entrantly start a new load.
        oldAsyncLoadType = m_AsynchLoadType;
        m_AsynchLoadType = NOT_LOADING;

        if (!resultErr) {
            // Get the returned data.
            if ((ASYNCH_LOAD_NBYTES == oldAsyncLoadType)
                || (ASYNCH_LOAD_ANY_BYTES == oldAsyncLoadType)) {
                DEBUG_LOG_VERBOSE("CAsyncIOStream::FinishAsyncLoad. m_LoadStartPosition= " INT64FMT,
                            m_LoadStartPosition);

                (void) SetPosition(m_LoadStartPosition);
            }
            // Advance the cursor.
            if (ASYNCH_LOAD_NBYTES == oldAsyncLoadType) {
                m_NextLoadStartPosition = m_LoadStopPosition;
            } else if (ASYNCH_LOAD_ANY_BYTES == oldAsyncLoadType) {
                m_NextLoadStartPosition = totalAvailBytes;
            } else if (ASYNCH_LOAD_TO_EOF == oldAsyncLoadType) {
                m_NextLoadStartPosition = totalAvailBytes;
            }

            DEBUG_LOG_VERBOSE("CAsyncIOStream::FinishAsyncLoad. m_NextLoadStartPosition= " INT64FMT,
                       m_NextLoadStartPosition);
        } // if (!resultErr)

        // Prepare to call the callback.
        if (NULL != m_pEventHandler) {
            if ((ASYNCH_LOAD_NBYTES == oldAsyncLoadType)
                || (ASYNCH_LOAD_ANY_BYTES == oldAsyncLoadType)
                || (ASYNCH_LOAD_TO_EOF == oldAsyncLoadType)) {
                pEventHandler = m_pEventHandler;
                ADDREF_OBJECT(m_pEventHandler);
            }
        }
    } /////////////////////////////////////////////////


    // Call the callback outside the lock.
    if (NULL != pEventHandler) {
        pEventHandler->OnReadyToRead(
                              resultErr,
                              totalAvailBytes,
                              this,
                              m_pEventHandlerContext);
        RELEASE_OBJECT(pEventHandler);
    }
} // FinishAsyncLoad.






/////////////////////////////////////////////////////////////////////////////
//
// [OnBlockIOEvent]
//
// This is called when a blockIO completes a read or a write.
// It is part of the CAsyncBlockIOCallback base class.
//
// This method cannot hold the lock while it calls a callback, since that may
// cause a deadlock. The AsyncIOStream object and the callback object may have
// separate locks. The callback object may run at any time, such as in response
// to a user action or a timer, or an I/O event on another data stream. This
// method and a few others are indirectly called by the selectThread, so they
// start a new call-chain. The callback object must be able to hold its lock while
// calling the AsyncIOStream, so the AsyncIOStream cannot hold its lock while calling
// the callback.
//
// This only effects OnBlockIOEvent, ContinueAsyncLoad and FinishAsyncLoad
// since these are the only methods that are called by the select thread.
/////////////////////////////////////////////////////////////////////////////
void
CAsyncIOStream::OnBlockIOEvent(CIOBuffer *pBuffer) {
    ErrVal err = ENoErr;
    ErrVal bufferErr = ENoErr;
    ErrVal continueAsyncLoadErr = ENoErr;
    ErrVal finishAsyncLoadErr = ENoErr;
    ErrVal eventErr = ENoErr;
    bool fCallContinueAsyncLoad = false;
    bool fCallFinishAsyncLoad = false;
    bool fCallOnFlush = false;
    bool fCallOnError = false;
    CIOBuffer *pPrevBuffer = NULL;

    DEBUG_LOG_VERBOSE("CAsyncIOStream::OnBlockIOEvent: this = %p, pBuffer = %p.", this, pBuffer);

    ///////////////////////////////////////////////////
    {
        AutoLock(m_pLock);

        if (NULL == pBuffer) {
            gotoErr(ENoErr);
        }

        // A handy place to put a breakpoint.
#if DD_DEBUG
        if (pBuffer->m_Err) {
            pBuffer = pBuffer;
        }
#endif // DD_DEBUG

        // Check if we are closed; this can happen if
        // a read comes in after we have times out and closed
        // a connection.
        if ((NULL == m_pBlockIO) || (NULL == m_pIOSystem)) {
            // We don't have to release this; the AsyncIOStream
            // is shutdown, so it has not addRef'ed the buffer.
            DEBUG_LOG_VERBOSE("CAsyncIOStream::OnBlockIOEvent: NULL == m_pBlockIO");
            gotoErr(ENoErr);
        } // discarding the buffer.

        // Run any standard debugger checks.
        // Do this AFTER we check whether we are closed, since
        // it treats closure as an error.
        RunChecks();


        // Some operations, like a Write(), do not need to
        // hold onto the buffer. In this case, discard the buffer as
        // soon as the IO operation completes.
        if (pBuffer->m_BufferFlags & CIOBuffer::DISCARD_WHEN_IDLE) {
            DEBUG_LOG_VERBOSE("CAsyncIOStream::OnBlockIOEvent: DISCARD_WHEN_IDLE");

            if (m_pBlockIO->m_fSeekable) {
                pBuffer->m_StreamBufferList.RemoveFromQueue();
                RELEASE_OBJECT(pBuffer);
            }

            if (m_pActiveIOBuffer == pBuffer) {
                m_pActiveIOBuffer = NULL;
                m_pFirstValidByte = NULL;
                m_pNextValidByte = NULL;
                m_pEndValidBytes = NULL;
                m_pLastPossibleValidByte = NULL;
            }

            gotoErr(ENoErr);
        } // discarding the buffer.


        //////////////////////////////////////////////
        // A read completed.
        if (CIOBuffer::READ == pBuffer->m_BufferOp) {
            DEBUG_LOG_VERBOSE("CAsyncIOStream::OnBlockIOEvent: Finished a read. err = %d", pBuffer->m_Err);
            DEBUG_LOG_VERBOSE("CAsyncIOStream::OnBlockIOEvent: pBuffer->m_NumValidBytes = %d", pBuffer->m_NumValidBytes);

            // A read error on a network means the peer disconnected.
            // Report this as a special case.
            if ((pBuffer->m_Err)
               && (CAsyncBlockIO::NETWORK_MEDIA == m_pBlockIO->m_MediaType)
               && !(m_AsyncIOStreamFlags & UDP_SERVER_STREAM)) {
                fCallOnError = true;
                eventErr = pBuffer->m_Err;
            }

            // If this is a stream blockIO, then buffers will be allocated
            // by the block IO when they are available. So, we have to add some
            // of the additional flags that are normally set by this module when
            // we allocate the buffers.
            if (!(m_pBlockIO->m_fSeekable)) {
                pBuffer->m_BufferFlags |= CIOBuffer::INPUT_BUFFER;

                // If this is just an error, we don't need to keep the buffer.
                if (!(pBuffer->m_Err)) {
                    // Keep m_IOBufferList in LRU order. New buffers are the most recently
                    // used, so they are added to the head of the list.
                    m_IOBufferList.InsertHead(&(pBuffer->m_StreamBufferList));
                    ADDREF_OBJECT(pBuffer);
                }

                m_pBlockIO->CancelTimeout(CIOBuffer::READ);
            } // (!(m_pBlockIO->m_fSeekable))

            pBuffer->m_BufferOp = CIOBuffer::NO_OP;

            // If we just read a UDP buffer, then create a new data stream
            // for this new data.
            if (m_AsyncIOStreamFlags & UDP_SERVER_STREAM) {
                DEBUG_LOG_VERBOSE("CAsyncIOStream::OnBlockIOEvent: Finished a UDP read.");
                //OnUDPRead(pBuffer);
                gotoErr(ENoErr);
            }

            if (pBuffer->m_NumValidBytes > 0) {
                // If this is a stream device, then adjust m_PosInMedia.
                // It was set in the select thread, which can't grab the stream's lock.
                // If this was set at the same time that we were removing bytes,
                // then it is not accurate.
                if (!(m_pBlockIO->m_fSeekable)) {
                    pBuffer->m_PosInMedia = m_TotalAvailableBytes;
                    m_TotalAvailableBytes += pBuffer->m_NumValidBytes;

                    DEBUG_LOG_VERBOSE("CAsyncIOStream::OnBlockIOEvent: New m_TotalAvailableBytes = %d", m_TotalAvailableBytes);
                    DEBUG_LOG_VERBOSE("CAsyncIOStream::OnBlockIOEvent: pBuffer->m_PosInMedia = %d", pBuffer->m_PosInMedia);
                }

                if (pBuffer == m_pActiveIOBuffer) {
                    m_pEndValidBytes = m_pFirstValidByte + pBuffer->m_NumValidBytes;
                }
            } // (pBuffer->m_NumValidBytes > 0)


            // If we are currently waiting for more readable data, then this may provide
            // enough data and we can stop waiting.
            if (NOT_LOADING != m_AsynchLoadType) {
                DEBUG_LOG_VERBOSE("CAsyncIOStream::OnBlockIOEvent: We are currently loading new bytes");

                if (pBuffer->m_NumValidBytes > 0) {
                    m_NextAsynchBufferPosition += pBuffer->m_NumValidBytes;
                }

                if (pBuffer->m_Err) {
                    fCallFinishAsyncLoad = true;
                    finishAsyncLoadErr = pBuffer->m_Err;
                } else if (pBuffer->m_NumValidBytes > 0) {
                    fCallContinueAsyncLoad = true;
                    continueAsyncLoadErr = pBuffer->m_Err;
                }
            } // (NOT_LOADING != m_AsynchLoadType)

            // Some network connections may be slow. If this is the case,
            // then check if we can combine this buffer with the previous buffer.
            if ((!(m_pBlockIO->m_fSeekable))
                && (!(pBuffer->m_Err))
                && (pBuffer->m_NumValidBytes < MIN_REASONABLE_NETWORK_PACKET)) {
                // Look for the previous buffer.
                pPrevBuffer = m_IOBufferList.GetHead();
                while (pPrevBuffer) {
                    if ((pPrevBuffer->m_PosInMedia + pPrevBuffer->m_NumValidBytes)
                            == pBuffer->m_PosInMedia)
                    {
                        break;
                    }
                    pPrevBuffer = pPrevBuffer->m_StreamBufferList.GetNextInQueue();
                } // Look for the previous buffer.

                // Check if we found a previous buffer that is small enough
                // to fit into the new buffer.
                if (NULL != pPrevBuffer) {
                    int32 offset = pPrevBuffer->m_pLogicalBuffer - pPrevBuffer->m_pPhysicalBuffer;

                    if ((offset + pPrevBuffer->m_NumValidBytes + pBuffer->m_NumValidBytes)
                        < (pPrevBuffer->m_BufferSize))
                    {
                        DEBUG_LOG_VERBOSE("CAsyncIOStream::OnBlockIOEvent: Combine adjacent buffers");

                        // Copy the new buffer into the previous buffer.
                        memcpy(
                            pPrevBuffer->m_pLogicalBuffer + pPrevBuffer->m_NumValidBytes,
                            pBuffer->m_pLogicalBuffer,
                            pBuffer->m_NumValidBytes);
                        pPrevBuffer->m_NumValidBytes += pBuffer->m_NumValidBytes;

                        // Discard the new buffer.
                        pBuffer->m_StreamBufferList.RemoveFromQueue();
                        RELEASE_OBJECT(pBuffer);

                        pBuffer = pPrevBuffer;
                        if (m_pActiveIOBuffer == pBuffer)
                        {
                            m_pEndValidBytes = m_pFirstValidByte + m_pActiveIOBuffer->m_NumValidBytes;
                        }

                        // Keep m_IOBufferList in LRU order. If we are touching this buffer, then
                        // move it to the head of the list.
                        pPrevBuffer->m_StreamBufferList.RemoveFromQueue();
                        m_IOBufferList.InsertHead(&(pPrevBuffer->m_StreamBufferList));
                    }
                    // If the new is too large to fit in the prev, then the
                    // prev will be too large to fit in the new. Both the
                    // new and the prev are normally the same size.
                } // if (NULL != pPrevBuffer)
            } // if (pBuffer->m_NumValidBytes < MIN_REASONABLE_NETWORK_PACKET)

            gotoErr(ENoErr);
        } // (CIOBuffer::READ == pBuffer->m_BufferOp)


        //////////////////////////////////////////////
        // A write completed.
        if (CIOBuffer::WRITE == pBuffer->m_BufferOp) {
            DEBUG_LOG_VERBOSE("CAsyncIOStream::OnBlockIOEvent: Finished a write. err = %d", pBuffer->m_Err);

            pBuffer->m_BufferOp = CIOBuffer::NO_OP;
            bufferErr = pBuffer->m_Err;

            // If this is a stream, then we are done with write buffers as soon
            // as they are complete. We do not read data from these buffers, so
            // we no longer need them.
            if (!(m_pBlockIO->m_fSeekable)) {
                pBuffer->m_StreamBufferList.RemoveFromQueue();
                RELEASE_OBJECT(pBuffer);
            }

            if (m_pActiveIOBuffer == pBuffer) {
                m_pActiveIOBuffer = NULL;
                m_pFirstValidByte = NULL;
                m_pNextValidByte = NULL;
                m_pEndValidBytes = NULL;
                m_pLastPossibleValidByte = NULL;
            }

            if ((m_AsyncIOStreamFlags & FLUSHING)
               || (m_AsyncIOStreamFlags & WAITING_ON_FLUSH)) {
                m_NumFlushWrites--;
                DEBUG_LOG_VERBOSE("CAsyncIOStream::OnBlockIOEvent: New m_NumFlushWrites = %d", m_NumFlushWrites);

                if (bufferErr) {
                    m_FlushErr = bufferErr;
                }

                // Check if there are any buffers still being written.
                // If no blocks are being written, then the flush is done.
                if ((m_NumFlushWrites <= 0) && (m_AsyncIOStreamFlags & WAITING_ON_FLUSH)) {
                    DEBUG_LOG_VERBOSE("CAsyncIOStream::OnBlockIOEvent: Call flush");
                    fCallOnFlush = true;
                    gotoErr(err);
                } // (NULL == pBuffer)
            } // (m_AsyncIOStreamFlags & FLUSHING)
        } // (CIOBuffer::WRITE == pBuffer->m_BufferOp)
    } ///////////////////////////////////////////////////////

abort:
    if (fCallContinueAsyncLoad) {
        ContinueAsyncLoad(continueAsyncLoadErr, true);
    }
    if (fCallFinishAsyncLoad) {
        FinishAsyncLoad(finishAsyncLoadErr);
    }
    if (fCallOnFlush) {
        FinishFlush();
    }
    if (fCallOnError) {
        AutoLock(m_pLock);
        if (NULL != m_pEventHandler) {
            CAsyncIOEventHandler *pEventHandler = m_pEventHandler;
            ADDREF_OBJECT(pEventHandler);
            pEventHandler->OnStreamDisconnect(eventErr, this, m_pEventHandlerContext);
            RELEASE_OBJECT(pEventHandler);
        } // (NULL != m_pEventHandler)
    }
} // OnBlockIOEvent.







/////////////////////////////////////////////////////////////////////////////
//
// [OnBlockIOOpen]
//
// CAsyncBlockIOCallback
/////////////////////////////////////////////////////////////////////////////
void
CAsyncIOStream::OnBlockIOOpen(ErrVal resultErr, CAsyncBlockIO *pBlockIO) {
    DEBUG_LOG_VERBOSE("CAsyncIOStream::OnBlockIOOpen, err = %d, blockIO = %p", resultErr, pBlockIO);

    RELEASE_OBJECT(m_pBlockIO);
    m_pBlockIO = pBlockIO;
    ADDREF_OBJECT(m_pBlockIO);

    if (!resultErr) {
        FinishOpenCommand();
    }

    if (m_pEventHandler) {
        CAsyncIOEventHandler *pEventHandler = m_pEventHandler;
        ADDREF_OBJECT(pEventHandler);
        pEventHandler->OnOpenAsyncIOStream(resultErr, this, m_pEventHandlerContext);
        RELEASE_OBJECT(pEventHandler);
    }
} // OnBlockIOOpen.






/////////////////////////////////////////////////////////////////////////////
//
// [OnBlockIOAccept]
//
// This is a CAsyncBlockIOCallback method. It is called when we accept a new
// network connection from a remote client. Wrap this in a CAsyncIOStream and
// pass it to out callback.
/////////////////////////////////////////////////////////////////////////////
void
CAsyncIOStream::OnBlockIOAccept(ErrVal resultErr, CAsyncBlockIO *pBlockIO) {
    ErrVal err = ENoErr;
    CAsyncIOStream *pAsyncIOStream = NULL;
    CAsyncIOEventHandler *pEventHandler = NULL;

    DEBUG_LOG_VERBOSE("CAsyncIOStream::OnBlockIOAccept, err = %d, blockIO = %p", resultErr, pBlockIO);

    if ((resultErr) || (NULL == pBlockIO)) {
        gotoErr(resultErr);
    }

    // Create a new stream.
    pAsyncIOStream = newex CAsyncIOStream;
    if (NULL == pAsyncIOStream) {
        gotoErr(EFail);
    }
    err = pAsyncIOStream->StartOpenCommand(NULL, NULL);
    if (err) {
        gotoErr(err);
    }

    // Connect the AsyncIOStream to the block IO. This will allow the
    // AsyncIOStream to read and buffer all incoming data.
    pAsyncIOStream->m_pBlockIO = pBlockIO;
    ADDREF_OBJECT(pBlockIO);

    err = pAsyncIOStream->FinishOpenCommand();
    if (err) {
        gotoErr(err);
    }

    // Notify our callback.
    pEventHandler = m_pEventHandler;
    ADDREF_OBJECT(pEventHandler);
    pEventHandler->OnAcceptConnection(pAsyncIOStream, m_pEventHandlerContext);
    RELEASE_OBJECT(pEventHandler);

abort:
    RELEASE_OBJECT(pAsyncIOStream);
    return;
} // OnBlockIOAccept.






/////////////////////////////////////////////////////////////////////////////
//
// [MoveBufferToBackground]
//
// MoveBufferToBackground starts to not write buffers for seekable and
// synchronous streams.
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::MoveBufferToBackground(CIOBuffer *pBuffer) {
    ErrVal err = ENoErr;
    AutoLock(m_pLock);

    if (NULL == m_pBlockIO) {
        gotoErr(EFail);
    }

    // If this buffer is not active, then ignore it.
    if ((NULL == pBuffer)
         || ((m_pActiveOutputIOBuffer != pBuffer)
            && (m_pActiveIOBuffer != pBuffer))) {
        gotoErr(ENoErr);
    }

    // Adjust the data size in the buffer.
    if (m_pActiveOutputIOBuffer == pBuffer) {
        m_pActiveOutputIOBuffer = NULL;
        m_pFirstValidOutputByte = NULL;
        m_pNextValidOutputByte = NULL;
        m_pLastPossibleValidOutputByte = NULL;
    } else if (m_pActiveIOBuffer == pBuffer) {
        m_pActiveIOBuffer = NULL;
        m_pFirstValidByte = NULL;
        m_pNextValidByte = NULL;
        m_pEndValidBytes = NULL;
        m_pLastPossibleValidByte = NULL;
    } // (m_pBlockIO->m_fSeekable)

    if (pBuffer->m_BufferFlags & CIOBuffer::UNSAVED_CHANGES) {
        err = WriteBackgroundBuffer(pBuffer);
    }

abort:
    returnErr(err);
} // MoveBufferToBackground.







/////////////////////////////////////////////////////////////////////////////
//
// [WriteBackgroundBuffer]
//
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::WriteBackgroundBuffer(CIOBuffer *pBuffer) {
    ErrVal err = ENoErr;
    AutoLock(m_pLock);

    if (NULL == m_pBlockIO) {
        gotoErr(EFail);
    }

    if (!(pBuffer->m_BufferFlags & CIOBuffer::UNSAVED_CHANGES)) {
        gotoErr(ENoErr);
    }

    pBuffer->m_BufferFlags &= ~CIOBuffer::UNSAVED_CHANGES;
    pBuffer->m_StartWriteOffset = 0;

    if (!(m_AsyncIOStreamFlags & EXPANDING_MEMORY_STREAM)) {
        m_pBlockIO->WriteBlockAsync(pBuffer, 0);
    }

abort:
    returnErr(err);
} // WriteBackgroundBuffer.







/////////////////////////////////////////////////////////////////////////////
//
// [AllocAsyncIOStreamBuffer]
//
// This allocates a new buffer.
/////////////////////////////////////////////////////////////////////////////
CIOBuffer *
CAsyncIOStream::AllocAsyncIOStreamBuffer(int64 bufferStartPos, bool fInputBuffer) {
    ErrVal err = ENoErr;
    CIOBuffer *pBuffer = NULL;
    bool fAllocBuffer = true;
    AutoLock(m_pLock);


    if ((NULL == m_pBlockIO) || (NULL == m_pIOSystem)) {
        gotoErr(EFail);
    }


    // We will either allocate a new block, or else recycle an existing one.
    // m_IOBufferList is in LRU order, so try to remove a buffer from the end.
    // The most recently used buffer is at the head. Start at the tail and
    // use the prev pointers to go towards the head.
    if ((m_pBlockIO->m_fSeekable)
            && !(m_AsyncIOStreamFlags & EXPANDING_MEMORY_STREAM)
            && !(m_AsyncIOStreamFlags & ALL_DATA_IS_IN_BUFFERS)
            && (m_IOBufferList.GetLength() >= m_MaxNumIOBuffersForSeekableDevices)) {
        pBuffer = m_IOBufferList.GetTail();
        while (NULL != pBuffer) {
            if (CIOBuffer::NO_OP == pBuffer->m_BufferOp) {
                DEBUG_LOG_VERBOSE("CAsyncIOStream::AllocAsyncIOStreamBuffer. Recycle a buffer");
                break;
            }
            pBuffer = pBuffer->m_StreamBufferList.GetPreviousInQueue();
        } // while (NULL != pBuffer)
    } // Recycle a buffer.


    // If we cannot recycle a buffer, then allocate a new one.
    if (NULL == pBuffer) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::AllocAsyncIOStreamBuffer. Allocate a new buffer");

        pBuffer = m_pIOSystem->AllocIOBuffer(-1, fAllocBuffer);
        if ((NULL == pBuffer)
            || (NULL == pBuffer->m_pPhysicalBuffer)
            || (pBuffer->m_BufferSize < 0)) {
            gotoErr(EFail);
        }

        // The buffer was AddRef'ed by AllocIOBuffer.
        if (fInputBuffer) {
            // Keep m_IOBufferList in LRU order. New buffers are the most recently
            // used, so they are added to the head of the list.
            m_IOBufferList.InsertHead(&(pBuffer->m_StreamBufferList));
        } else {
            // The output list is FIFO, so new buffers are inserted at the tail.
            m_OutputBufferList.InsertTail(&(pBuffer->m_StreamBufferList));
        }
    } // Allocate a new bufer.

    pBuffer->m_BufferOp = CIOBuffer::NO_OP;
    pBuffer->m_BufferFlags &= ~CIOBuffer::VALID_DATA;
    pBuffer->m_Err = ENoErr;
    if (fInputBuffer) {
        pBuffer->m_BufferFlags |= CIOBuffer::INPUT_BUFFER;
    }
    pBuffer->m_BufferFlags |= CIOBuffer::OUTPUT_BUFFER;

    // buffer and bufferSize are initialized by AllocIOBuffer.
    pBuffer->m_pLogicalBuffer = pBuffer->m_pPhysicalBuffer;
    pBuffer->m_NumValidBytes = 0;

    pBuffer->m_PosInMedia = m_pIOSystem->GetIOStartPosition(bufferStartPos);

    return(pBuffer);

abort:
    return(NULL);
} // AllocAsyncIOStreamBuffer.







/////////////////////////////////////////////////////////////////////////////
//
// [Flush]
//
/////////////////////////////////////////////////////////////////////////////
void
CAsyncIOStream::Flush() {
    ErrVal err = ENoErr;
    CIOBuffer *pBuffer;
    AutoLock(m_pLock);
    RunChecks();

    DEBUG_LOG_VERBOSE("CAsyncIOStream::Flush");

    // The stream must have been initialized.
    if (NULL == m_pBlockIO) {
        gotoErr(EFail);
    }

    m_AsyncIOStreamFlags |= FLUSHING;
    m_NumFlushWrites = 0;
    m_FlushErr = ENoErr;


    // In some cases, we never write data to the backing store,
    // so there is nothing to do.
    if ((m_AsyncIOStreamFlags & EXPANDING_MEMORY_STREAM)
            || (m_AsyncIOStreamFlags & ALL_DATA_IS_IN_BUFFERS)) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::Flush is a no-op on a memory stream");
        gotoErr(ENoErr);
    }

    pBuffer = m_IOBufferList.GetHead();
    while (pBuffer) {
        if ((CIOBuffer::NO_OP == pBuffer->m_BufferOp)
            && (pBuffer->m_BufferFlags & CIOBuffer::UNSAVED_CHANGES)) {
            DEBUG_LOG_VERBOSE("CAsyncIOStream::Flush writes one buffer (%p)", pBuffer);
            err = WriteBackgroundBuffer(pBuffer);
            if (!err) {
                m_NumFlushWrites += 1;
            }
        } else if (CIOBuffer::WRITE == pBuffer->m_BufferOp) {
            DEBUG_LOG_VERBOSE("CAsyncIOStream::Flush found a buffer already being written");
            m_NumFlushWrites += 1;
        }
        pBuffer = pBuffer->m_StreamBufferList.GetNextInQueue();
    } // while (pBuffer)

    pBuffer = m_OutputBufferList.GetHead();
    while (pBuffer) {
        if ((CIOBuffer::NO_OP == pBuffer->m_BufferOp)
            && (pBuffer->m_BufferFlags & CIOBuffer::UNSAVED_CHANGES)) {
            DEBUG_LOG_VERBOSE("CAsyncIOStream::Flush writes one buffer (%p)", pBuffer);
            err = WriteBackgroundBuffer(pBuffer);
            if (!err) {
                m_NumFlushWrites += 1;
            }
        } else if (CIOBuffer::WRITE == pBuffer->m_BufferOp) {
            DEBUG_LOG_VERBOSE("CAsyncIOStream::Flush found a buffer already being written");
            m_NumFlushWrites += 1;
        }
        pBuffer = pBuffer->m_StreamBufferList.GetNextInQueue();
    } // while (pBuffer)

    DEBUG_LOG_VERBOSE("CAsyncIOStream::Flush m_NumFlushWrites = %d", m_NumFlushWrites);

    m_pActiveIOBuffer = NULL;
    m_pFirstValidByte = NULL;
    m_pNextValidByte = NULL;
    m_pEndValidBytes = NULL;
    m_pLastPossibleValidByte = NULL;

    m_pActiveOutputIOBuffer = NULL;
    m_pFirstValidOutputByte = NULL;
    m_pNextValidOutputByte = NULL;
    m_pLastPossibleValidOutputByte = NULL;

abort:
    // If no blocks are being written, then the flush is done.
    // Be careful. Don't do this if some other methods has
    // already called OnFlush.
    m_AsyncIOStreamFlags &= ~FLUSHING;
    m_AsyncIOStreamFlags |= WAITING_ON_FLUSH;
    if (m_NumFlushWrites <= 0) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::Flush calls FinishFlush");
        FinishFlush();
    }
} // Flush








/////////////////////////////////////////////////////////////////////////////
//
// [FinishFlush]
//
/////////////////////////////////////////////////////////////////////////////
void
CAsyncIOStream::FinishFlush() {
    AutoLock(m_pLock);
    RunChecks();

    DEBUG_LOG_VERBOSE("CAsyncIOStream::FinishFlush");
    if (m_pBlockIO->m_NumActiveWrites > 0) {
       DEBUG_WARNING("There are active reads or writes.");
    }

    m_NumFlushWrites = 0;
    m_AsyncIOStreamFlags &= ~FLUSHING;
    m_AsyncIOStreamFlags &= ~WAITING_ON_FLUSH;

    if (NULL != m_pEventHandler) {
        CAsyncIOEventHandler *pEventHandler = m_pEventHandler;
        ADDREF_OBJECT(pEventHandler);
        pEventHandler->OnFlush(m_FlushErr, this, m_pEventHandlerContext);
        RELEASE_OBJECT(pEventHandler);
     } // (NULL != m_pEventHandler)
} // FinishFlush.







/////////////////////////////////////////////////////////////////////////////
//
// [CheckState]
//
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::CheckState() {
    ErrVal err = ENoErr;
    CIOBuffer *pBuffer;
    CIOBuffer *pNextBuffer;
    bool fFoundLargerBuffer;
    bool foregroundIsOK;
    int64 activeBufferStartPos;
    int64 activeBufferStopPos;
    AutoLock(m_pLock);

    // This means the stream has closed. This happens all the
    // time with net connections to web servers, and it is not
    // an error.
    if (NULL == m_pIOSystem) {
        gotoErr(ENoErr);
    }

    err = m_pBlockIO->CheckState();
    if (err) {
        gotoErr(EFail);
    }
    err = m_pIOSystem->CheckState();
    if (err) {
        gotoErr(EFail);
    }

    if (NULL == m_pActiveIOBuffer) {
        if ((m_pFirstValidByte)
            || (m_pNextValidByte)
            || (m_pEndValidBytes)) {
            gotoErr(EFail);
        }
    }
    if (NULL == m_pActiveOutputIOBuffer) {
        if ((m_pFirstValidOutputByte)
            || (m_pNextValidOutputByte)) {
            gotoErr(EFail);
        }
    }

    if (m_pActiveIOBuffer) {
        // Check m_pFirstValidByte
        if ((m_pFirstValidByte )
            && (m_pActiveIOBuffer)
            && (m_pActiveIOBuffer->m_pLogicalBuffer)
            && (m_pFirstValidByte != m_pActiveIOBuffer->m_pLogicalBuffer)) {
            gotoErr(EFail);
        }

        // Check m_pNextValidByte
        if ((m_pNextValidByte)
                && (m_pActiveIOBuffer)
                && (m_pActiveIOBuffer->m_pLogicalBuffer)) {
            if (m_pNextValidByte > (m_pActiveIOBuffer->m_pLogicalBuffer + m_pActiveIOBuffer->m_NumValidBytes)) {
                gotoErr(EFail);
            }
            if (m_pNextValidByte < m_pActiveIOBuffer->m_pLogicalBuffer) {
                gotoErr(EFail);
            }
        }

        // Check m_pEndValidBytes
        if ((m_pEndValidBytes)
            && (m_pActiveIOBuffer)
            && (m_pActiveIOBuffer->m_pLogicalBuffer)) {
            if (m_pEndValidBytes < m_pActiveIOBuffer->m_pLogicalBuffer) {
                gotoErr(EFail);
            }
            // While we are in the iddle of a read, m_pEndValidBytes may still
            // point to the beginning of the buffer. This is not an error.
            if (m_pEndValidBytes > (m_pActiveIOBuffer->m_pLogicalBuffer + m_pActiveIOBuffer->m_NumValidBytes)) {
                gotoErr(EFail);
            }
        }

        // Check m_pLastPossibleValidByte
        if ((m_pLastPossibleValidByte)
            && (m_pActiveIOBuffer)
            && (m_pActiveIOBuffer->m_pPhysicalBuffer)) {
            if (m_pLastPossibleValidByte < m_pActiveIOBuffer->m_pPhysicalBuffer) {
                gotoErr(EFail);
            }
            if (m_pLastPossibleValidByte > (m_pActiveIOBuffer->m_pPhysicalBuffer + m_pActiveIOBuffer->m_BufferSize)) {
                gotoErr(EFail);
            }
        }

        if ((m_pFirstValidByte)
            && (m_pEndValidBytes)
            && (m_pFirstValidByte > m_pEndValidBytes)) {
            gotoErr(EFail);
        }

        if ((m_pNextValidByte)
            && (m_pFirstValidByte)
            && (m_pEndValidBytes)
            && ((m_pNextValidByte < m_pFirstValidByte)
                || (m_pNextValidByte > m_pEndValidBytes))) {
            gotoErr(EFail);
        }
    } // if (m_pActiveIOBuffer)


    if (m_pActiveOutputIOBuffer) {
        if ((m_pFirstValidOutputByte)
            && (m_pActiveOutputIOBuffer)
            && (m_pActiveOutputIOBuffer->m_pLogicalBuffer)
            && (m_pFirstValidOutputByte != m_pActiveOutputIOBuffer->m_pLogicalBuffer)) {
            gotoErr(EFail);
        }

        // Check m_pNextValidOutputByte
        if ((m_pNextValidOutputByte)
            && (m_pActiveOutputIOBuffer)
            && (m_pActiveOutputIOBuffer->m_pLogicalBuffer)) {
            if (m_pNextValidOutputByte
                > (m_pActiveOutputIOBuffer->m_pLogicalBuffer + m_pActiveOutputIOBuffer->m_NumValidBytes)) {
                gotoErr(EFail);
            }
            if (m_pNextValidOutputByte < m_pActiveOutputIOBuffer->m_pLogicalBuffer) {
                gotoErr(EFail);
            }
        }

        // Check m_pLastPossibleValidOutputByte
        if ((m_pLastPossibleValidOutputByte)
            && (m_pActiveOutputIOBuffer)
            && (m_pActiveOutputIOBuffer->m_pPhysicalBuffer)) {
            if (m_pLastPossibleValidOutputByte < m_pActiveOutputIOBuffer->m_pPhysicalBuffer) {
                gotoErr(EFail);
            }
            if (m_pLastPossibleValidOutputByte
                > (m_pActiveOutputIOBuffer->m_pPhysicalBuffer + m_pActiveOutputIOBuffer->m_BufferSize)) {
                gotoErr(EFail);
            }
        }

        if ((m_pFirstValidOutputByte)
            && (m_pNextValidOutputByte)
            && (m_pFirstValidOutputByte > m_pNextValidOutputByte)) {
            gotoErr(EFail);
        }
    } // if (m_pActiveOutputIOBuffer)


    // Now, check every buffer in the list.
    pBuffer = m_IOBufferList.GetHead();
    while (pBuffer) {
        err = pBuffer->CheckState();
        if (err) {
            gotoErr(EFail);
        }

        // Look for the next larger buffer.
        fFoundLargerBuffer = false;
        pNextBuffer = m_IOBufferList.GetHead();
        while (pNextBuffer) {
            if (pNextBuffer->m_PosInMedia > pBuffer->m_PosInMedia) {
                fFoundLargerBuffer = true;
                if ((pBuffer->m_PosInMedia + pBuffer->m_NumValidBytes)
                    == pNextBuffer->m_PosInMedia) {
                    break;
                }
            }

            pNextBuffer = pNextBuffer->m_StreamBufferList.GetNextInQueue();
        }

        if ((fFoundLargerBuffer) && (!pNextBuffer)) {
            gotoErr(EFail);
        }

        pBuffer = pBuffer->m_StreamBufferList.GetNextInQueue();
    } // while (pBuffer)


    pBuffer = m_OutputBufferList.GetHead();
    while (pBuffer) {
        err = pBuffer->CheckState();
        if (err) {
            gotoErr(EFail);
        }

        pBuffer = pBuffer->m_StreamBufferList.GetNextInQueue();
    }

    // Make sure the foreground buffer doesn't overlap with
    // any background buffer.
    if (NULL == m_pActiveIOBuffer) {
        foregroundIsOK = true;
    } else
    {
        activeBufferStartPos = m_pActiveIOBuffer->m_PosInMedia;
        if (0 == m_pActiveIOBuffer->m_NumValidBytes) {
            activeBufferStopPos = m_pActiveIOBuffer->m_PosInMedia;
        } else {
            activeBufferStopPos = m_pActiveIOBuffer->m_PosInMedia + m_pActiveIOBuffer->m_NumValidBytes - 1;
        }
        foregroundIsOK = false;

        pBuffer = m_IOBufferList.GetHead();
        while (pBuffer) {
            if (pBuffer == m_pActiveIOBuffer) {
                foregroundIsOK = true;
            }
            // Make sure this buffer does not overlap any other buffers.
            else if (m_pActiveIOBuffer) {
                if ((activeBufferStartPos >= pBuffer->m_PosInMedia)
                    && (activeBufferStartPos < (pBuffer->m_PosInMedia + pBuffer->m_NumValidBytes))) {
                    gotoErr(EFail);
                }
                if ((activeBufferStopPos >= pBuffer->m_PosInMedia)
                    && (activeBufferStopPos < (pBuffer->m_PosInMedia + pBuffer->m_NumValidBytes))) {
                    gotoErr(EFail);
                }
            }

            pBuffer = pBuffer->m_StreamBufferList.GetNextInQueue();
        }

        if (!foregroundIsOK) {
            gotoErr(EFail);
        }
    } // if (NULL != m_pActiveIOBuffer)

    if ((NULL != m_pActiveOutputIOBuffer)
        && (m_pBlockIO->m_fSeekable)) {
        gotoErr(EFail);
    }

abort:
    returnErr(err);
} // CheckState.







//////////////////////////////////////////////////////////////////////////////
//
// [GetChar]
//
// Get a single UTF-8 character. This may be 1 byte or several bytes, depending
// on the character.
//////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::GetChar(
                  char *pBuffer,
                  int32 maxCharLen,
                  int *pCharLen) {
    ErrVal err = ENoErr;
    AutoLock(m_pLock);

    if ((NULL == pBuffer) || (NULL == pCharLen) || (maxCharLen < 0)) {
        gotoErr(EFail);
    }

    // Read the next character from the buffer.
    // Depending on the alphabet, this may be several bytes, so it
    // could straddle buffers.
    *pCharLen = 0;
    while (*pCharLen < maxCharLen) {
        if ((m_pNextValidByte)
            && (m_pEndValidBytes)
            && (m_pNextValidByte < m_pEndValidBytes)) {
            *pBuffer = *(m_pNextValidByte++);
        } else {
            err = Read(pBuffer, 1);
            DEBUG_LOG_VERBOSE("CAsyncIOStream::GetChar. Call Read() err = %d", err);
            if (err) {
                gotoErr(err);
            }
        }

        pBuffer++;
        *pCharLen += 1;

        break;
        // For now, no explicit Unicode support at the stream level.
        // if (CStringLib::IsCompleteChar(pBuffer, *pCharLen)) { break; }
    } // while (*pCharLen < maxCharLen)

abort:
    returnErr(err);
} // GetChar




//////////////////////////////////////////////////////////////////////////////
//
// [PutChar]
//
// Write a single UTF-8 character. This may be 1 byte or several bytes, depending
// on the character.
//////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::PutChar(const char *pChar, int32 charLen) {
    if (NULL == pChar) {
       returnErr(EFail);
    }
    charLen = charLen;
    return(PutByte(*pChar));
} // PutChar





/////////////////////////////////////////////////////////////////////////////
//
// [PutByte]
//
// Write a single octet (1 byte). This may or may not be a complete character
// in UTF-8.
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::PutByte(char c) {
    ErrVal err = ENoErr;
    AutoLock(m_pLock);

    if (NULL == m_pBlockIO) {
        returnErr(EFail);
    }


    // Fast case #1.
    // We have room in the buffer, and we maintain separate buffers for input and
    // output.
    if ((!(m_pBlockIO->m_fSeekable))
        && (m_pNextValidOutputByte)
        && (m_pLastPossibleValidOutputByte)
        && (m_pNextValidOutputByte < m_pLastPossibleValidOutputByte)) {
        // Now, at this point we know there is enough room in the buffer
        // for the value. Copy the value to the buffer. We save it in
        // the buffer in the same format as it will appear in the file.
        *(m_pNextValidOutputByte++) = c;

        // Now that we know this is the buffer we will be writing to,
        // record that we have written to it.
        if (m_pActiveOutputIOBuffer) {
           m_pActiveOutputIOBuffer->m_NumValidBytes = m_pNextValidOutputByte - m_pFirstValidOutputByte;
           m_pActiveOutputIOBuffer->m_BufferFlags |= CIOBuffer::UNSAVED_CHANGES;
           m_pActiveOutputIOBuffer->m_BufferFlags |= CIOBuffer::VALID_DATA;
        }
    }
    // Fast case #1.
    // We have room in the buffer, and we use the same buffer for input and
    // output.
    else if ((m_pBlockIO->m_fSeekable)
            && (m_pNextValidByte)
            && (m_pLastPossibleValidByte)
            && (m_pNextValidByte < m_pLastPossibleValidByte)) {
        // Now, at this point we know there is enough room in the buffer
        // for the value. Copy the value to the buffer. We save it in
        // the buffer in the same format as it will appear in the file.
        *(m_pNextValidByte++) = c;

        // If you write, then seek backwards, then write again, the second
        // write overwrites new data with even newer data. It does not,
        // however, extend the modified part of the block.
        if ((m_pEndValidBytes) && (m_pNextValidByte > m_pEndValidBytes)) {
            m_pEndValidBytes = m_pNextValidByte;
        }

        // Now that we know this is the buffer we will be writing to,
        // record that we have written to it.
        if (m_pActiveIOBuffer) {
           m_pActiveIOBuffer->m_NumValidBytes = m_pEndValidBytes - m_pFirstValidByte;
           m_pActiveIOBuffer->m_BufferFlags |= CIOBuffer::UNSAVED_CHANGES;
           m_pActiveIOBuffer->m_BufferFlags |= CIOBuffer::VALID_DATA;
        }
    }
    // The slow case. Write() may allocate a new empty buffer.
    else
    {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::PutByte. Call Write()");
        err = Write((char *) &c, 1);
        if (err) {
            gotoErr(err);
        }
    }

abort:
    returnErr(err);
} // PutByte








//////////////////////////////////////////////////////////////////////////////
//
// [printf]
//
//////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::printf(const char *format, ...) {
    ErrVal err = ENoErr;
    va_list argList;
    char formatChar;
    int intValue;
    char charValue;
    char *pCharPtrValue;
    int32 state = PRINTF_FORMAT_NORMAL_CHAR;
    char widthString[32];
    char numberBuffer[32];
    char convertedCharBuffer[16];
    int32 convertedCharSize;
    int widthStringLength = 0;
    char *pEndPtr;
    int32 numDigits = -1;
    int32 charsWritten;
    int numActualDigits;
    AutoLock(m_pLock);


    va_start(argList, format);


    // This is the main loop; it examines every character in the format string.
    while ((*format) && (!err)) {
        formatChar = *format;
        format++;

        switch (state) {
        ///////////////////////////
        // Print the next character from the buffer. If this is special (like \ or %) then it
        // will change the state.
        case PRINTF_FORMAT_NORMAL_CHAR:
            if ('\\' == formatChar) {
                state = PRINTF_FORMAT_ESCAPED_CHAR;
            } else if ('%' == formatChar) {
                // Prepare to process a new variable descriptor.
                state = PRINTF_FORMAT_PERCENT_CHAR;
                widthStringLength = 0;
                numDigits = -1;
            } else {
                // Write the character.
                err = PutByte(formatChar);
                if (err) {
                    gotoErr(err);
                }
            }
            break;


        ///////////////////////////
        case PRINTF_FORMAT_ESCAPED_CHAR:
            state = PRINTF_FORMAT_NORMAL_CHAR;
            // Just write the character without interpreting it.
            err = PutByte(formatChar);
            if (err) {
                gotoErr(err);
            }
            break;


        ///////////////////////////
        case PRINTF_FORMAT_VARIABLE_WIDTH:
            if (CStringLib::IsByte(formatChar, CStringLib::NUMBER_CHAR)) {
                if (widthStringLength < 32) {
                    widthString[widthStringLength] = formatChar;
                    widthStringLength += 1;
                } else {
                    gotoErr(EInvalidArg);
                }
            } else {
                state = PRINTF_FORMAT_PERCENT_CHAR;
                widthStringLength = 0;
                numDigits = -1;

                format--;
            } // read the variable type char
            break;


        ///////////////////////////
        case PRINTF_FORMAT_PERCENT_CHAR:
            // We just read %x, for some x, so now switch on the value of x.
            if (CStringLib::IsByte(formatChar, CStringLib::NUMBER_CHAR)) {
                widthString[0] = formatChar;
                widthStringLength = 1;
                state = PRINTF_FORMAT_VARIABLE_WIDTH;
                // The rest of the number characters will be read in the
                // PRINTF_FORMAT_VARIABLE_WIDTH state, which appends to widthString.
            } else {
                // If there was a width string that was part of this
                // format, then convert that to a number.
                if (widthStringLength >  0) {
                    err = CStringLib::StringToNumber(widthString, widthStringLength, &numDigits);
                    if (err)
                    {
                        numDigits = -1;
                        err = ENoErr;
                    }
                } else {
                    numDigits = -1;
                } // convert width string to a number.


                state = PRINTF_FORMAT_NORMAL_CHAR;
                switch(formatChar) {
                ////////////////////////////////////////
                case 'l': // long
                case 'd': // integer
                case 'i': // integer
                    //<><><>Cleanup
                    intValue = va_arg(argList, int32);
                    snprintf(numberBuffer, sizeof(numberBuffer), "%d", intValue);
                    pEndPtr = numberBuffer + strlen(numberBuffer);

                    // WARNING.
                    // numActualDigits puts a limit on the total formatted size, not the number
                    // of digits. So, it includes chars like ',' and '.'.
                    // In UTF-8, all of these characters are 1 byte.
                    numActualDigits = (pEndPtr - numberBuffer);

                    if (numDigits > 0) {
                        // Prefix this with 0's.
                        while (numActualDigits < numDigits) {
                            err = PutByte('0');
                            if (err) {
                                gotoErr(err);
                            }
                            numActualDigits++;
                        }

                        // Truncate, if necessary.
                        while (numActualDigits > numDigits) {
                            DEBUG_WARNING("Truncating number.");
                            pEndPtr = pEndPtr - 1;
                            numActualDigits--;
                        }
                    } // (numDigits > 0)

                    // Write the number.
                    err = Write(numberBuffer, pEndPtr - numberBuffer);
                    if (err) {
                        gotoErr(err);
                    }
                    break;

                ////////////////////////////////////////
                case 'C': // char
                case 'c': // char
#if LINUX
                    charValue = va_arg(argList, int);
#else
                    charValue = va_arg(argList, char);
#endif

                    if ('c' == formatChar) {
                        err = PutByte(charValue);
                        if (err) {
                            gotoErr(err);
                        }
                    } else {
                        err = CStringLib::ConvertUTF8ToUTF16(
                                            &charValue,
                                            1,
                                            (WCHAR *) (&convertedCharBuffer[0]),
                                            sizeof(convertedCharBuffer),
                                            &convertedCharSize);
                        if (err) {
                            gotoErr(err);
                        }
                        err = Write(convertedCharBuffer, convertedCharSize);
                        if (err) {
                            gotoErr(err);
                        }
                    }
                    break;


                ////////////////////////////////////////
                case 'Q': // string
                case 'q': // string
                case 'S': // string
                case 's': // string
                    pCharPtrValue = va_arg(argList, char *);
                    charsWritten = 0;
                    if (NULL != pCharPtrValue) {
                        // Preface it with a quote.
                        if (('Q' == formatChar) || ('q' == formatChar)) {
                            err = PutByte('\"');
                            if (err) {
                                gotoErr(err);
                            }
                            // Optionally make this a UTF-16 character.
                            if ('Q' == formatChar) {
                                err = PutByte(0);
                                if (err) {
                                    gotoErr(err);
                                }
                            }
                        } // if (('Q' == formatChar) || ('q' == formatChar))

                        while (*pCharPtrValue) {
                            if (('s' == formatChar) || ('q' == formatChar)) {
                                err = PutByte(*pCharPtrValue);
                                if (err) {
                                    gotoErr(err);
                                }
                            } else {
                                err = CStringLib::ConvertUTF8ToUTF16(
                                                    pCharPtrValue,
                                                    1,
                                                    (WCHAR *) (&convertedCharBuffer[0]),
                                                    sizeof(convertedCharBuffer),
                                                    &convertedCharSize);
                                if (err) {
                                    gotoErr(err);
                                }
                                err = Write(convertedCharBuffer, convertedCharSize);
                                if (err) {
                                    gotoErr(err);
                                }
                            }

                            pCharPtrValue++;
                            charsWritten++;
                            if ((numDigits > 0) && (charsWritten >= numDigits))
                            {
                                break;
                            }
                        } // while (*pCharPtrValue)


                        // Pad the string.
                        if (numDigits > 0)
                        {
                            while (charsWritten < numDigits)
                            {
                                err = PutByte(' ');
                                if (err)
                                {
                                    gotoErr(err);
                                }
                                // Optionally make this a UTF-16 character.
                                if (('S' == formatChar) || ('Q' == formatChar))
                                {
                                    err = PutByte(' ');
                                    if (err)
                                    {
                                        gotoErr(err);
                                    }
                                }

                                charsWritten++;
                            } // while (charsWritten < numDigits)
                        } // if (numDigits > 0)


                        // Close with a quote.
                        if (('Q' == formatChar) || ('q' == formatChar)) {
                            err = PutByte('\"');
                            if (err) {
                                gotoErr(err);
                            }
                            // Optionally make this a UTF-16 character.
                            if ('Q' == formatChar) {
                                err = PutByte(0);
                                if (err)
                                {
                                    gotoErr(err);
                                }
                            }
                        } // if (('Q' == formatChar) || ('q' == formatChar))
                    } // (NULL != pCharPtrValue)
                    break;

                ////////////////////////////////////////
                default:
                    // If a percent sign (%) is followed by a character that has no
                    // meaning as a format-control character, that character and the
                    // following characters (up to the next percent sign) are treated
                    // as an ordinary sequence of characters, that is, a sequence of
                    // characters that must match the input. For example, to specify
                    // that a percent-sign character is to be input, use %%.
                    err = PutByte(formatChar);
                    if (err) {
                        gotoErr(err);
                    }
                    break;
                } // switch on variable type.
            } // read the variable type char
            break;

        default:
            break;
        } // switch on the state.
    } // main loop

abort:
    va_end(argList);
    returnErr(err);
} // printf






/////////////////////////////////////////////////////////////////////////////
//
// [SkipWhileCharType]
//
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::SkipWhileCharType(int32 charType) {
    ErrVal err = ENoErr;
    char streamCharBuffer[32];
    int32 streamCharSize;
    int32 streamCharProperties;
    int64 lastCharPosition;
    AutoLock(m_pLock);
    RunChecks();

    // Make sure we start at a valid buffer. Otherwise, all pointers are bogus.
    if (NULL == m_pActiveIOBuffer) {
        err = SetPosition(GetDataLength());
        if (err) {
            if (EEOF == err) {
                err = ENoErr;
            }
            gotoErr(err);
        }
    }

    // This outer loop reads complete characters. Each iteration
    // reads one character, which may span buffers.
    while (1) {
        lastCharPosition = GetPosition();

        // Read the next character from the buffer.
        // GetChar() can handle multi-byte characters that might straddle buffers.
        err = GetChar(
                    streamCharBuffer,
                    sizeof(streamCharBuffer),
                    &streamCharSize);
        if (err) {
            if (EEOF == err) {
                err = ENoErr;
            }
            gotoErr(err);
        }

        streamCharProperties = CStringLib::GetCharProperties(streamCharBuffer, streamCharSize);
        if (!(streamCharProperties & charType)) {
            // We skip up to just BEFORE the character with this type.
            err = SetPosition(lastCharPosition);
            gotoErr(err);
        }
    } // reading the next buffer of data.

abort:
    returnErr(err);
} // SkipWhileCharType.






/////////////////////////////////////////////////////////////////////////////
//
// [SkipUntilCharType]
//
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOStream::SkipUntilCharType(int32 charType) {
    ErrVal err = ENoErr;
    char streamCharBuffer[32];
    int32 streamCharSize;
    int32 streamCharProperties;
    int64 lastCharPosition;
    AutoLock(m_pLock);
    RunChecks();

    // Make sure we start at a valid buffer. Otherwise, all pointers are bogus.
    if (NULL == m_pActiveIOBuffer) {
        err = SetPosition(GetDataLength());
        if (err) {
            gotoErr(err);
        }
    }

    // This outer loop reads complete characters. Each iteration
    // reads one character, which may span buffers.
    while (1) {
        lastCharPosition = GetPosition();
        // Read the next character from the buffer.
        // Depending on the alphabet, this may be several bytes, so it
        // could straddle buffers.
        err = GetChar(
                  streamCharBuffer,
                  sizeof(streamCharBuffer),
                  &streamCharSize);
        if (err) {
            gotoErr(err);
        }
        streamCharProperties = CStringLib::GetCharProperties(streamCharBuffer, 1);
        if (streamCharProperties & charType) {
            // We skip up to just BEFORE the character with this type.
            err = SetPosition(lastCharPosition);
            gotoErr(err);
        }
    } // reading the next buffer of data.

abort:
    returnErr(err);
} // SkipUntilCharType






/////////////////////////////////////////////////////////////////////////////
//
// [FindString]
//
// This procedure searches a stream for a pattern. A search may span several
// buffers and an instance of the pattern may straddle two buffers.
/////////////////////////////////////////////////////////////////////////////
int64
CAsyncIOStream::FindString(
                const char *pPattern,
                int32 patternLength,
                int32 searchOptions,
                int64 stopPosition) {
    ErrVal err = ENoErr;
    int64 currentPos;
    int64 savePos = -1;
    int64 matchPosition = -1;
    int32 numBytesInBuffer;
    int32 numBytesToSearch;
    char straddleBuffer[ (CStringLib::MAX_SEARCH_PATTERN_SIZE * 2) + 8 ];
    int32 numStraddleBytesFromPrevBuffer;
    int32 numStraddleBytesFromCurrentBuffer;
    char *pStartStraddleText;
    int64 straddleBufferPos;
    const char *pMatchStr;
    bool fHitStopPosition = false;
    AutoLock(m_pLock);
    RunChecks();

    searchOptions = searchOptions; // Unused.
    if ((NULL == pPattern)
        || (patternLength > CStringLib::MAX_SEARCH_PATTERN_SIZE)
        || (patternLength < 0)) {
        gotoErr(EFail);
    }
    DEBUG_LOG_VERBOSE("CAsyncIOStream::FindString. pattern = %s", pPattern);

    // This is not an error, just a degenerate case.
    if (0 == *pPattern) {
        DEBUG_LOG_VERBOSE("CAsyncIOStream::FindString. Giving up on an empty pattern");
        gotoErr(ENoErr);
    }

    // Make sure we start at a valid buffer. Otherwise, all pointers are bogus.
    if (NULL == m_pActiveIOBuffer) {
        err = SetPosition(0);
        if (err) {
            DEBUG_LOG("CAsyncIOStream::FindString. SetPosition failed. err = %d", err);
            gotoErr(err);
        }
    }
    if ((NULL == m_pEndValidBytes)
        || (NULL == m_pNextValidByte)
        || (NULL == m_pFirstValidByte)
        || (NULL == m_pActiveIOBuffer)) {
        gotoErr(EFail);
    }

    savePos = m_pActiveIOBuffer->m_PosInMedia + (m_pNextValidByte - m_pFirstValidByte);
    currentPos = savePos;


    // Each iteration of this loop searches one buffer and its
    // staddle with the next buffer.
    numStraddleBytesFromPrevBuffer = 0;
    while (1) {
        numBytesInBuffer = m_pEndValidBytes - m_pNextValidByte;

        // If there is a straddle buffer from the previous buffer, then
        // search that now. This finds instances of the pattern that straddled
        // the previous and current buffer.
        if (numStraddleBytesFromPrevBuffer > 0) {
            // Copy the first patternLength bytes of the current buffer into
            // the straddle buffer.
            numStraddleBytesFromCurrentBuffer = patternLength;
            if (numStraddleBytesFromCurrentBuffer > numBytesInBuffer) {
                numStraddleBytesFromCurrentBuffer = numBytesInBuffer;
            }
            memcpy(
                straddleBuffer + numStraddleBytesFromPrevBuffer,
                m_pNextValidByte,
                numStraddleBytesFromCurrentBuffer);

            // Look for the pattern in the straddle buffer.
            numBytesToSearch = numStraddleBytesFromPrevBuffer + numStraddleBytesFromCurrentBuffer;
            if ((stopPosition > 0) && ((straddleBufferPos + numBytesToSearch) > stopPosition)) {
               numBytesToSearch = (int32) (stopPosition - straddleBufferPos);
               fHitStopPosition = true;
            }

            pMatchStr = CStringLib::FindPatternInBuffer(
                                                    straddleBuffer,
                                                    numBytesToSearch,
                                                    pPattern,
                                                    patternLength);
            if (NULL != pMatchStr) {
                matchPosition = straddleBufferPos + (pMatchStr - straddleBuffer);
                break;
            }
            numStraddleBytesFromPrevBuffer = 0;

            if (fHitStopPosition) {
               gotoErr(err);
            }
        } // if (numStraddleBytesFromPrevBuffer > 0)

        if ((stopPosition > 0) && ((currentPos + numBytesInBuffer) > stopPosition)) {
            numBytesInBuffer = (int32) (stopPosition - currentPos);
            fHitStopPosition = true;
        }

        // Look for the pattern in this buffer.
        pMatchStr = CStringLib::FindPatternInBuffer(
                                                m_pNextValidByte,
                                                numBytesInBuffer,
                                                pPattern,
                                                patternLength);
        if (NULL != pMatchStr) {
            matchPosition = currentPos + (pMatchStr - m_pNextValidByte);
            break;
        }

        if (fHitStopPosition) {
           break;
        }

        // Copy the last patternLength bytes of this buffer into
        // the straddle buffer.
        numStraddleBytesFromPrevBuffer = patternLength;
        if (numStraddleBytesFromPrevBuffer > numBytesInBuffer) {
            numStraddleBytesFromPrevBuffer = numBytesInBuffer;
        }
        if (numStraddleBytesFromPrevBuffer < 0) {
            numStraddleBytesFromPrevBuffer = 0;
        }

        pStartStraddleText = m_pNextValidByte
                              + numBytesInBuffer
                              - numStraddleBytesFromPrevBuffer;
        memcpy(
            straddleBuffer,
            pStartStraddleText,
            numStraddleBytesFromPrevBuffer);
        straddleBufferPos = currentPos + (pStartStraddleText - m_pNextValidByte);


        // Go to the next buffer.
        currentPos += numBytesInBuffer;
        err = SetPosition(currentPos);
        if ((err)
            || (NULL == m_pEndValidBytes)
            || (NULL == m_pNextValidByte)) {
            if (EEOF == err) {
               err = ENoErr;
            }
            DEBUG_LOG("CAsyncIOStream::FindString. SetPosition failed. err = %d", err);
            gotoErr(err);
        }
    } // searching in every buffer.

abort:
    // Set the position in the stream.
    if (matchPosition >= 0) {
        err = SetPosition(matchPosition);
    } else if (savePos >= 0) {
        err = SetPosition(savePos);
    }

    return(matchPosition);
} // FindString.





/////////////////////////////////////////////////////////////////////////////
//
/////////////////////////////////////////////////////////////////////////////
CAsyncIOEventHandlerSynch::CAsyncIOEventHandlerSynch() {
    m_pSemaphore = NULL;

    m_pAsyncIOStream = NULL;
    m_TestErr = ENoErr;
} // CAsyncIOEventHandlerSynch






/////////////////////////////////////////////////////////////////////////////
//
/////////////////////////////////////////////////////////////////////////////
CAsyncIOEventHandlerSynch::~CAsyncIOEventHandlerSynch() {
    RELEASE_OBJECT(m_pSemaphore);
    RELEASE_OBJECT(m_pAsyncIOStream);
} // ~CAsyncIOEventHandlerSynch




/////////////////////////////////////////////////////////////////////////////
//
// [Initialize]
//
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOEventHandlerSynch::Initialize() {
    ErrVal err = ENoErr;

    m_pSemaphore = newex CRefEvent;
    if (NULL == m_pSemaphore) {
        gotoErr(EFail);
    }

    err = m_pSemaphore->Initialize();
    if (err) {
        gotoErr(err);
    }

abort:
    returnErr(err);
} // Initialize





/////////////////////////////////////////////////////////////////////////////
//
/////////////////////////////////////////////////////////////////////////////
ErrVal
CAsyncIOEventHandlerSynch::Wait() {
    if (m_pSemaphore) {
        m_pSemaphore->Wait();
    }
    return(m_TestErr);
} // Wait







/////////////////////////////////////////////////////////////////////////////
//
/////////////////////////////////////////////////////////////////////////////
void
CAsyncIOEventHandlerSynch::OnReadyToRead(
                                ErrVal err,
                                int64 totalBytesAvailable,
                                CAsyncIOStream *pAsyncIOStream,
                                void *pCallbackContext) {
    // Unused parameters
    totalBytesAvailable = totalBytesAvailable;
    pAsyncIOStream = pAsyncIOStream;
    pCallbackContext = pCallbackContext;

    m_TestErr = err;
    if (m_pSemaphore) {
        m_pSemaphore->Signal();
    }
} // OnReadyToRead





/////////////////////////////////////////////////////////////////////////////
//
/////////////////////////////////////////////////////////////////////////////
void
CAsyncIOEventHandlerSynch::OnFlush(
                                ErrVal err,
                                CAsyncIOStream *pAsyncIOStream,
                                void *pCallbackContext) {
    // Unused parameters
    pAsyncIOStream = pAsyncIOStream;
    pCallbackContext = pCallbackContext;

    m_TestErr = err;
    if (m_pSemaphore) {
        m_pSemaphore->Signal();
    }
} // OnFlush







/////////////////////////////////////////////////////////////////////////////
//
/////////////////////////////////////////////////////////////////////////////
void
CAsyncIOEventHandlerSynch::OnOpenAsyncIOStream(
                               ErrVal resultErr,
                               CAsyncIOStream *pAsyncIOStream,
                               void *pCallbackContext) {
    // Unused parameters
    pCallbackContext = pCallbackContext;

    RELEASE_OBJECT(m_pAsyncIOStream);
    m_pAsyncIOStream = pAsyncIOStream;
    ADDREF_OBJECT(m_pAsyncIOStream);

    m_TestErr = resultErr;
    if (m_pSemaphore) {
        m_pSemaphore->Signal();
    }
} // OnOpenAsyncIOStream







/////////////////////////////////////////////////////////////////////////////
//
/////////////////////////////////////////////////////////////////////////////
void
CAsyncIOEventHandlerSynch::OnAcceptConnection(
                                CAsyncIOStream *pNewAsyncIOStream,
                                void *pContext) {
    // Unused parameters
    pNewAsyncIOStream = pNewAsyncIOStream;
    pContext = pContext;

    if (m_pSemaphore) {
        m_pSemaphore->Signal();
    }
} // OnAcceptConnection







/////////////////////////////////////////////////////////////////////////////
//
//                     TESTING PROCEDURES
//
/////////////////////////////////////////////////////////////////////////////
#if INCLUDE_REGRESSION_TESTS

#if LINUX
#include <unistd.h>
#endif

static CAsyncIOEventHandlerSynch *g_TestCallback = NULL;

ErrVal TestCompareStreams(CAsyncIOStream *reader, CAsyncIOStream *writer, bool clipToDestSize);
ErrVal TestCopyStreams(CAsyncIOStream *reader, CAsyncIOStream *writer, bool fBucketIO);

// Watch out, google now has a chunked main page. The higher levels of the
// http code can handle chunking, but not this simple test code.
//static char *g_TestServerName = "www.google.com";
#define DEFAULT_TEST_SERVER     "www.uky.edu"
static char *g_TestServerName = (char *) DEFAULT_TEST_SERVER;
static int g_TestServerPort = 80;





/////////////////////////////////////////////////////////////////////////////
//
// [TestAsyncIOStream]
//
/////////////////////////////////////////////////////////////////////////////
void
CAsyncIOStream::TestAsyncIOStream() {
    ErrVal err = ENoErr;
    CAsyncIOStream *reader = NULL;
    CAsyncIOStream *writer = NULL;
    int i;
    char c;
    CDebugObject timer;
    int64 strOffset;
    char path[512];
    CParsedUrl *pUrl = NULL;
    bool fFoundProxy;
    const char *ptr = "GET / HTTP/1.1\r\n"
            "Accept: */*\r\n"
            "TestHeader: 1\r\n"
            "Accept-Language: en-us\r\n"
            //"Accept-Encoding: gzip, deflate\r\n"
            "User-Agent: dawsonsBrowser\r\n" // Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.0; .NET CLR 1.1.4322)\r\n"
            "Host: www.altavista.com\r\n"
            //"Connection: Keep-Alive\r\n"
            "\r\n";



    g_DebugManager.StartModuleTest("Async IO Stream");

    g_TestCallback = newex CAsyncIOEventHandlerSynch;
    if (NULL == g_TestCallback) {
        gotoErr(EFail);
    }
    err = g_TestCallback->Initialize();
    if (err) {
        gotoErr(err);
    }

    fFoundProxy = NetIO_GetLocalProxySettings(&g_TestServerName, &g_TestServerPort);
    if (!fFoundProxy) {
        g_TestServerName = (char *) DEFAULT_TEST_SERVER;
        g_TestServerPort = 80;
    }


    //////////////////////////////////////////////////////////
    g_DebugManager.StartTest("Synchronously copy one file into another");


    RELEASE_OBJECT(pUrl);
    g_DebugManager.AddTestDataDirectoryPath("sample.html", path, 512);
    pUrl = CParsedUrl::AllocateFileUrl(path);
    if (NULL == pUrl) {
        gotoErr(EFail);
    }

    err = CAsyncIOStream::OpenAsyncIOStream(
                                 pUrl,
                                 CAsyncBlockIO::READ_ACCESS | CAsyncBlockIO::WRITE_ACCESS,
                                 g_TestCallback,
                                 NULL);
    if (err) {
        gotoErr(err);
    }

    err = g_TestCallback->Wait();
    if (err) {
        gotoErr(err);
    }
    reader = g_TestCallback->m_pAsyncIOStream;
    g_TestCallback->m_pAsyncIOStream = NULL;



    // Make sure that old data doesn't confuse us.
    g_DebugManager.AddTestResultsDirectoryPath("asyncIOStreamTestOutput.txt", path, 512);
    (void) CSimpleFile::DeleteFile(path);

    RELEASE_OBJECT(pUrl);
    pUrl = CParsedUrl::AllocateFileUrl(path);
    if (NULL == pUrl) {
        gotoErr(EFail);
    }
    err = CAsyncIOStream::OpenAsyncIOStream(
                              pUrl,
                              CAsyncBlockIO::CREATE_NEW_STORE | CAsyncBlockIO::READ_ACCESS | CAsyncBlockIO::WRITE_ACCESS,
                              g_TestCallback,
                              NULL);
    if (err) {
        gotoErr(err);
    }


    err = g_TestCallback->Wait();
    if (err) {
        gotoErr(err);
    }
    writer = g_TestCallback->m_pAsyncIOStream;
    g_TestCallback->m_pAsyncIOStream = NULL;




    reader->ListenForAllBytesToEOF();
    err = g_TestCallback->Wait();
    if (err) {
        gotoErr(err);
    }

    //streamSize = reader->GetDataLength();
    reader->SetDebugFlags(CDebugObject::CHECK_STATE_ON_EVERY_OP);
    writer->SetDebugFlags(CDebugObject::CHECK_STATE_ON_EVERY_OP);

    err = TestCopyStreams(reader, writer, true);
    if (err) {
        gotoErr(err);
    }



    writer->Flush();
    err = g_TestCallback->Wait();
    if (err) {
        gotoErr(err);
    }

    reader->Close();
    writer->Close();

    //////////////////////////////////////////////////////////
    g_DebugManager.StartTest("Read a copied file and read until we get an EOF");

    RELEASE_OBJECT(pUrl);
    g_DebugManager.AddTestDataDirectoryPath("sample.html", path, 512);
    pUrl = CParsedUrl::AllocateFileUrl(path);
    if (NULL == pUrl) {
        gotoErr(EFail);
    }

    err = CAsyncIOStream::OpenAsyncIOStream(
                                pUrl,
                                CAsyncBlockIO::READ_ACCESS | CAsyncBlockIO::WRITE_ACCESS,
                                g_TestCallback,
                                NULL);
    if (err) {
        gotoErr(err);
    }


    err = g_TestCallback->Wait();
    if (err) {
        gotoErr(err);
    }
    reader = g_TestCallback->m_pAsyncIOStream;
    g_TestCallback->m_pAsyncIOStream = NULL;


    g_DebugManager.AddTestResultsDirectoryPath("asyncIOStreamTestOutput.txt", path, 512);
    RELEASE_OBJECT(pUrl);
    pUrl = CParsedUrl::AllocateFileUrl(path);
    if (NULL == pUrl) {
        gotoErr(EFail);
    }

    err = CAsyncIOStream::OpenAsyncIOStream(
                        pUrl,
                        CAsyncBlockIO::READ_ACCESS | CAsyncBlockIO::WRITE_ACCESS,
                        g_TestCallback,
                        NULL);
    if (err) {
        gotoErr(err);
    }

    err = g_TestCallback->Wait();
    if (err) {
        gotoErr(err);
    }
    writer = g_TestCallback->m_pAsyncIOStream;
    g_TestCallback->m_pAsyncIOStream = NULL;


    reader->ListenForAllBytesToEOF();
    err = g_TestCallback->Wait();
    if (err) {
        gotoErr(err);
    }


    writer->ListenForAllBytesToEOF();
    err = g_TestCallback->Wait();
    if (err) {
        gotoErr(err);
    }


    err = TestCompareStreams(reader, writer, false);
    if (err) {
        gotoErr(err);
    }
    err = TestCompareStreams(writer, reader, false);
    if (err) {
        gotoErr(err);
    }



    //////////////////////////////////////////////////////////
    g_DebugManager.StartTest("Read at EOF several times");
    for (i = 0; i < 10; i++) {
        err = reader->GetByte(&c);
        if (EEOF != err) {
            DEBUG_WARNING("Missed an EOF");
        }
    }


    writer->Close();
    reader->Close();


    //////////////////////////////////////////////////////////
    g_DebugManager.StartTest("Synchronously copy a network link into a file");

    {
        bool fFoundProxy;
        char buffer[CParsedUrl::MAX_URL_LENGTH];

        fFoundProxy = NetIO_GetLocalProxySettings(&g_TestServerName, &g_TestServerPort);
        if (!fFoundProxy) {
            g_TestServerName = (char *) DEFAULT_TEST_SERVER;
            g_TestServerPort = 80;
        }
        snprintf(
            buffer,
            CParsedUrl::MAX_URL_LENGTH,
            "http://%s:%d",
            g_TestServerName,
            g_TestServerPort);

        RELEASE_OBJECT(pUrl);
        pUrl = CParsedUrl::AllocateUrl(buffer);
        if (NULL == pUrl) {
            gotoErr(EFail);
        }
    }


    err = CAsyncIOStream::OpenAsyncIOStream(pUrl, 0, g_TestCallback, NULL);
    if (err) {
        gotoErr(err);
    }
    err = g_TestCallback->Wait();
    if (err) {
        gotoErr(err);
    }
    reader = (CAsyncIOStream *) (g_TestCallback->m_pAsyncIOStream);
    g_TestCallback->m_pAsyncIOStream = NULL;



    err = reader->Write(ptr, strlen(ptr));
    if (err) {
        gotoErr(err);
    }

    reader->Flush();
    err = g_TestCallback->Wait();
    if (err) {
        gotoErr(err);
    }


    // Make sure that old data doesn't confuse us.
    g_DebugManager.AddTestResultsDirectoryPath("asyncIOStreamTestOutput.txt", path, 512);
    ::unlink(path);
    RELEASE_OBJECT(pUrl);
    pUrl = CParsedUrl::AllocateFileUrl(path);
    if (NULL == pUrl) {
        gotoErr(EFail);
    }

    err = CAsyncIOStream::OpenAsyncIOStream(
                        pUrl,
                        CAsyncBlockIO::CREATE_NEW_STORE | CAsyncBlockIO::READ_ACCESS | CAsyncBlockIO::WRITE_ACCESS,
                        g_TestCallback,
                        NULL);
    if (err) {
        gotoErr(err);
    }

    err = g_TestCallback->Wait();
    if (err) {
        gotoErr(err);
    }
    writer = g_TestCallback->m_pAsyncIOStream;
    g_TestCallback->m_pAsyncIOStream = NULL;



    reader->ListenForNBytes(-1, 100);
    err = g_TestCallback->Wait();
    if (err) {
        gotoErr(err);
    }


    reader->SetDebugFlags(CDebugObject::CHECK_STATE_ON_EVERY_OP);
    writer->SetDebugFlags(CDebugObject::CHECK_STATE_ON_EVERY_OP);

    err = TestCopyStreams(reader, writer, false);
    if (err) {
        gotoErr(err);
    }

    writer->Flush();
    err = g_TestCallback->Wait();
    if (err) {
        gotoErr(err);
    }



    //////////////////////////////////////////////////////////
    g_DebugManager.StartTest("Read a copied stream and read until we get an EOF");


    writer->Close();

    g_DebugManager.AddTestResultsDirectoryPath("asyncIOStreamTestOutput.txt", path, 512);
    RELEASE_OBJECT(pUrl);
    pUrl = CParsedUrl::AllocateFileUrl(path);
    if (NULL == pUrl) {
        gotoErr(EFail);
    }

    err = CAsyncIOStream::OpenAsyncIOStream(
                        pUrl,
                        CAsyncBlockIO::READ_ACCESS | CAsyncBlockIO::WRITE_ACCESS,
                        g_TestCallback,
                        NULL);
    if (err) {
        gotoErr(err);
    }


    err = g_TestCallback->Wait();
    if (err) {
        gotoErr(err);
    }
    writer = g_TestCallback->m_pAsyncIOStream;
    g_TestCallback->m_pAsyncIOStream = NULL;


    writer->ListenForAllBytesToEOF();
    err = g_TestCallback->Wait();
    if (err) {
        gotoErr(err);
    }


    err = TestCompareStreams(reader, writer, true);
    if (err) {
        gotoErr(err);
    }
    err = TestCompareStreams(writer, reader, true);
    if (err) {
        gotoErr(err);
    }


    //////////////////////////////////////////////////////////
    g_DebugManager.StartTest("Read at EOF several times");
    {
        int64 streamSize = 0;
        int64 streamPos = 0;
        streamSize = writer->GetDataLength();
        streamPos = writer->GetPosition();
        ::printf("\n>>streamSize = %d, stremPos=%d", ((int32)streamSize), ((int32) streamPos));
    }
    // BE Careful. Use the writer stream. This is a local file, which won't change
    // size. The reader is still pulling in packets from the network, so it may
    // no longer be at the end of the stream if new packets arrived.
    for (i = 0; i < 10; i++) {
        err = writer->GetByte(&c);
        if (EEOF != err) {
            DEBUG_WARNING("Missed an EOF");
        }
    }



    //////////////////////////////////////////////////////////
    g_DebugManager.StartTest("Search for a string in a file");

    RELEASE_OBJECT(pUrl);
    g_DebugManager.AddTestDataDirectoryPath("sample.html", path, 512);
    pUrl = CParsedUrl::AllocateFileUrl(path);
    if (NULL == pUrl) {
        gotoErr(EFail);
    }

    err = CAsyncIOStream::OpenAsyncIOStream(
                        pUrl,
                        CAsyncBlockIO::READ_ACCESS | CAsyncBlockIO::WRITE_ACCESS,
                        g_TestCallback,
                        NULL);
    if (err) {
        gotoErr(err);
    }

    err = g_TestCallback->Wait();
    if (err) {
        gotoErr(err);
    }
    reader = g_TestCallback->m_pAsyncIOStream;
    g_TestCallback->m_pAsyncIOStream = NULL;

    strOffset = reader->FindString(
                           "CAsyncIOStream::",
                           13, //patternSize,
                           0, // flags
                           -1);
    strOffset = strOffset; // Unused (for now)

    strOffset = reader->FindString(
                           "&*()MissingPattern!@#$",
                           13, //patternSize,
                           0, // flags
                           -1);
    strOffset = strOffset; // Unused (for now)

abort:
    if (err) {
        ::printf("\nError: (%d)\n", ERROR_CODE(err));
    }

    if (reader) {
        reader->Close();
    }
    if (writer) {
        writer->Close();
    }
    RELEASE_OBJECT(reader);
    RELEASE_OBJECT(writer);
    RELEASE_OBJECT(pUrl);
} // TestAsyncIOStream.








/////////////////////////////////////////////////////////////////////////////
//
// [TestCopyStreams]
//
/////////////////////////////////////////////////////////////////////////////
ErrVal
TestCopyStreams(
            CAsyncIOStream *reader,
            CAsyncIOStream *writer,
            bool fBucketIO) {
    ErrVal err = ENoErr;
    char c;
    char preReadC;
    char postReadC;
    int32 trialNum;
    int32 byteNum = 0;
    int64 streamSize = 0;

    err = reader->SetPosition(0);
    if (err) {
        gotoErr(err);
    }

    streamSize = reader->GetDataLength();
    byteNum = 0;
    while (1) {
        for (trialNum = 0; trialNum < 3; trialNum++) {
            err = reader->GetByte(&preReadC);
            // An error may only mean EOF.
            if (EEOF == err) {
                gotoErr(ENoErr);
            } else if (err) {
                gotoErr(err);
            }

            err = reader->UnGetByte();
            if (err) {
                gotoErr(err);
            }
        }

        err = reader->GetByte(&c);
        if (err) {
            gotoErr(EFail);
        }

        if (preReadC != c) {
            gotoErr(EFail);
        }

        err = writer->PutByte(c);
        if (err) {
            gotoErr(err);
        }


        for (trialNum = 0; trialNum < 3; trialNum++) {
            err = reader->UnGetByte();
            if (err) {
                gotoErr(err);
            }

            err = reader->GetByte(&postReadC);
            if (err) {
                gotoErr(err);
            }

            if (postReadC != c) {
                gotoErr(EFail);
            }
        }

        byteNum++;
    }

abort:
    // This test does not make sense if this is a pipe IO, since new packets
    // may be arriving throughout the test.
    if ((fBucketIO) && (byteNum != streamSize)) {
        DEBUG_WARNING("Bad copy");
    }

    returnErr(err);
} // TestCopyStreams.








/////////////////////////////////////////////////////////////////////////////
//
// [TestCompareStreams]
//
// Use fClipToCopiedSize when we are comparing a network read to a local 
// file copy. If the network is slow, then packets may arrive after we 
// finish copying to a local file, which means the original file from the 
// network will be longer than the local copy.
/////////////////////////////////////////////////////////////////////////////
ErrVal
TestCompareStreams(CAsyncIOStream *reader, CAsyncIOStream *writer, bool fClipToCopiedSize) {
    ErrVal err = ENoErr;
    char c1;
    char c2;
    char preReadC;
    int32 trialNum;
    int32 byteNum = 0;
    int64 streamSize = 0;

    err = reader->SetPosition(0);
    if (err) {
        gotoErr(err);
    }

    err = writer->SetPosition(0);
    if (err) {
        gotoErr(err);
    }

    streamSize = writer->GetDataLength();


    byteNum = 0;
    while ((!fClipToCopiedSize) || (byteNum < streamSize)) {
        for (trialNum = 0; trialNum < 3; trialNum++) {
            err = reader->GetByte(&preReadC);
            // An error may only mean EOF.
            if (EEOF == err) {
                err = ENoErr;
                gotoErr(ENoErr);
            } else if (err) {
                gotoErr(err);
            }

            err = reader->UnGetByte();
            if (err) {
                gotoErr(err);
            }
        }

        err = reader->GetByte(&c1);
        if (err) {
            gotoErr(err);
        }

        if (preReadC != c1) {
            gotoErr(EFail);
        }

        err = writer->GetByte(&c2);
        if (err) {
            gotoErr(err);
        }

        if (c2 != c1) {
            gotoErr(EFail);
        }

        byteNum++;
        if ((fClipToCopiedSize) && (byteNum >= streamSize)) {
            break;
        }
    }

abort:
    byteNum = byteNum;
    returnErr(err);
} // TestCompareStreams.



#endif // INCLUDE_REGRESSION_TESTS