000001  /*
000002  ** 2011-07-09
000003  **
000004  ** The author disclaims copyright to this source code.  In place of
000005  ** a legal notice, here is a blessing:
000006  **
000007  **    May you do good and not evil.
000008  **    May you find forgiveness for yourself and forgive others.
000009  **    May you share freely, never taking more than you give.
000010  **
000011  *************************************************************************
000012  ** This file contains code for the VdbeSorter object, used in concert with
000013  ** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements
000014  ** or by SELECT statements with ORDER BY clauses that cannot be satisfied
000015  ** using indexes and without LIMIT clauses.
000016  **
000017  ** The VdbeSorter object implements a multi-threaded external merge sort
000018  ** algorithm that is efficient even if the number of elements being sorted
000019  ** exceeds the available memory.
000020  **
000021  ** Here is the (internal, non-API) interface between this module and the
000022  ** rest of the SQLite system:
000023  **
000024  **    sqlite3VdbeSorterInit()       Create a new VdbeSorter object.
000025  **
000026  **    sqlite3VdbeSorterWrite()      Add a single new row to the VdbeSorter
000027  **                                  object.  The row is a binary blob in the
000028  **                                  OP_MakeRecord format that contains both
000029  **                                  the ORDER BY key columns and result columns
000030  **                                  in the case of a SELECT w/ ORDER BY, or
000031  **                                  the complete record for an index entry
000032  **                                  in the case of a CREATE INDEX.
000033  **
000034  **    sqlite3VdbeSorterRewind()     Sort all content previously added.
000035  **                                  Position the read cursor on the
000036  **                                  first sorted element.
000037  **
000038  **    sqlite3VdbeSorterNext()       Advance the read cursor to the next sorted
000039  **                                  element.
000040  **
000041  **    sqlite3VdbeSorterRowkey()     Return the complete binary blob for the
000042  **                                  row currently under the read cursor.
000043  **
000044  **    sqlite3VdbeSorterCompare()    Compare the binary blob for the row
000045  **                                  currently under the read cursor against
000046  **                                  another binary blob X and report if
000047  **                                  X is strictly less than the read cursor.
000048  **                                  Used to enforce uniqueness in a
000049  **                                  CREATE UNIQUE INDEX statement.
000050  **
000051  **    sqlite3VdbeSorterClose()      Close the VdbeSorter object and reclaim
000052  **                                  all resources.
000053  **
000054  **    sqlite3VdbeSorterReset()      Refurbish the VdbeSorter for reuse.  This
000055  **                                  is like Close() followed by Init() only
000056  **                                  much faster.
000057  **
000058  ** The interfaces above must be called in a particular order.  Write() can 
000059  ** only occur in between Init()/Reset() and Rewind().  Next(), Rowkey(), and
000060  ** Compare() can only occur in between Rewind() and Close()/Reset(). i.e.
000061  **
000062  **   Init()
000063  **   for each record: Write()
000064  **   Rewind()
000065  **     Rowkey()/Compare()
000066  **   Next() 
000067  **   Close()
000068  **
000069  ** Algorithm:
000070  **
000071  ** Records passed to the sorter via calls to Write() are initially held 
000072  ** unsorted in main memory. Assuming the amount of memory used never exceeds
000073  ** a threshold, when Rewind() is called the set of records is sorted using
000074  ** an in-memory merge sort. In this case, no temporary files are required
000075  ** and subsequent calls to Rowkey(), Next() and Compare() read records 
000076  ** directly from main memory.
000077  **
000078  ** If the amount of space used to store records in main memory exceeds the
000079  ** threshold, then the set of records currently in memory are sorted and
000080  ** written to a temporary file in "Packed Memory Array" (PMA) format.
000081  ** A PMA created at this point is known as a "level-0 PMA". Higher levels
000082  ** of PMAs may be created by merging existing PMAs together - for example
000083  ** merging two or more level-0 PMAs together creates a level-1 PMA.
000084  **
000085  ** The threshold for the amount of main memory to use before flushing 
000086  ** records to a PMA is roughly the same as the limit configured for the
000087  ** page-cache of the main database. Specifically, the threshold is set to 
000088  ** the value returned by "PRAGMA main.page_size" multipled by 
000089  ** that returned by "PRAGMA main.cache_size", in bytes.
000090  **
000091  ** If the sorter is running in single-threaded mode, then all PMAs generated
000092  ** are appended to a single temporary file. Or, if the sorter is running in
000093  ** multi-threaded mode then up to (N+1) temporary files may be opened, where
000094  ** N is the configured number of worker threads. In this case, instead of
000095  ** sorting the records and writing the PMA to a temporary file itself, the
000096  ** calling thread usually launches a worker thread to do so. Except, if
000097  ** there are already N worker threads running, the main thread does the work
000098  ** itself.
000099  **
000100  ** The sorter is running in multi-threaded mode if (a) the library was built
000101  ** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater
000102  ** than zero, and (b) worker threads have been enabled at runtime by calling
000103  ** "PRAGMA threads=N" with some value of N greater than 0.
000104  **
000105  ** When Rewind() is called, any data remaining in memory is flushed to a 
000106  ** final PMA. So at this point the data is stored in some number of sorted
000107  ** PMAs within temporary files on disk.
000108  **
000109  ** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the
000110  ** sorter is running in single-threaded mode, then these PMAs are merged
000111  ** incrementally as keys are retreived from the sorter by the VDBE.  The
000112  ** MergeEngine object, described in further detail below, performs this
000113  ** merge.
000114  **
000115  ** Or, if running in multi-threaded mode, then a background thread is
000116  ** launched to merge the existing PMAs. Once the background thread has
000117  ** merged T bytes of data into a single sorted PMA, the main thread 
000118  ** begins reading keys from that PMA while the background thread proceeds
000119  ** with merging the next T bytes of data. And so on.
000120  **
000121  ** Parameter T is set to half the value of the memory threshold used 
000122  ** by Write() above to determine when to create a new PMA.
000123  **
000124  ** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when 
000125  ** Rewind() is called, then a hierarchy of incremental-merges is used. 
000126  ** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on 
000127  ** disk are merged together. Then T bytes of data from the second set, and
000128  ** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT
000129  ** PMAs at a time. This done is to improve locality.
000130  **
000131  ** If running in multi-threaded mode and there are more than
000132  ** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more
000133  ** than one background thread may be created. Specifically, there may be
000134  ** one background thread for each temporary file on disk, and one background
000135  ** thread to merge the output of each of the others to a single PMA for
000136  ** the main thread to read from.
000137  */
000138  #include "sqliteInt.h"
000139  #include "vdbeInt.h"
000140  
000141  /* 
000142  ** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various
000143  ** messages to stderr that may be helpful in understanding the performance
000144  ** characteristics of the sorter in multi-threaded mode.
000145  */
000146  #if 0
000147  # define SQLITE_DEBUG_SORTER_THREADS 1
000148  #endif
000149  
000150  /*
000151  ** Hard-coded maximum amount of data to accumulate in memory before flushing
000152  ** to a level 0 PMA. The purpose of this limit is to prevent various integer
000153  ** overflows. 512MiB.
000154  */
000155  #define SQLITE_MAX_PMASZ    (1<<29)
000156  
000157  /*
000158  ** Private objects used by the sorter
000159  */
000160  typedef struct MergeEngine MergeEngine;     /* Merge PMAs together */
000161  typedef struct PmaReader PmaReader;         /* Incrementally read one PMA */
000162  typedef struct PmaWriter PmaWriter;         /* Incrementally write one PMA */
000163  typedef struct SorterRecord SorterRecord;   /* A record being sorted */
000164  typedef struct SortSubtask SortSubtask;     /* A sub-task in the sort process */
000165  typedef struct SorterFile SorterFile;       /* Temporary file object wrapper */
000166  typedef struct SorterList SorterList;       /* In-memory list of records */
000167  typedef struct IncrMerger IncrMerger;       /* Read & merge multiple PMAs */
000168  
000169  /*
000170  ** A container for a temp file handle and the current amount of data 
000171  ** stored in the file.
000172  */
000173  struct SorterFile {
000174    sqlite3_file *pFd;              /* File handle */
000175    i64 iEof;                       /* Bytes of data stored in pFd */
000176  };
000177  
000178  /*
000179  ** An in-memory list of objects to be sorted.
000180  **
000181  ** If aMemory==0 then each object is allocated separately and the objects
000182  ** are connected using SorterRecord.u.pNext.  If aMemory!=0 then all objects
000183  ** are stored in the aMemory[] bulk memory, one right after the other, and
000184  ** are connected using SorterRecord.u.iNext.
000185  */
000186  struct SorterList {
000187    SorterRecord *pList;            /* Linked list of records */
000188    u8 *aMemory;                    /* If non-NULL, bulk memory to hold pList */
000189    int szPMA;                      /* Size of pList as PMA in bytes */
000190  };
000191  
000192  /*
000193  ** The MergeEngine object is used to combine two or more smaller PMAs into
000194  ** one big PMA using a merge operation.  Separate PMAs all need to be
000195  ** combined into one big PMA in order to be able to step through the sorted
000196  ** records in order.
000197  **
000198  ** The aReadr[] array contains a PmaReader object for each of the PMAs being
000199  ** merged.  An aReadr[] object either points to a valid key or else is at EOF.
000200  ** ("EOF" means "End Of File".  When aReadr[] is at EOF there is no more data.)
000201  ** For the purposes of the paragraphs below, we assume that the array is
000202  ** actually N elements in size, where N is the smallest power of 2 greater
000203  ** to or equal to the number of PMAs being merged. The extra aReadr[] elements
000204  ** are treated as if they are empty (always at EOF).
000205  **
000206  ** The aTree[] array is also N elements in size. The value of N is stored in
000207  ** the MergeEngine.nTree variable.
000208  **
000209  ** The final (N/2) elements of aTree[] contain the results of comparing
000210  ** pairs of PMA keys together. Element i contains the result of 
000211  ** comparing aReadr[2*i-N] and aReadr[2*i-N+1]. Whichever key is smaller, the
000212  ** aTree element is set to the index of it. 
000213  **
000214  ** For the purposes of this comparison, EOF is considered greater than any
000215  ** other key value. If the keys are equal (only possible with two EOF
000216  ** values), it doesn't matter which index is stored.
000217  **
000218  ** The (N/4) elements of aTree[] that precede the final (N/2) described 
000219  ** above contains the index of the smallest of each block of 4 PmaReaders
000220  ** And so on. So that aTree[1] contains the index of the PmaReader that 
000221  ** currently points to the smallest key value. aTree[0] is unused.
000222  **
000223  ** Example:
000224  **
000225  **     aReadr[0] -> Banana
000226  **     aReadr[1] -> Feijoa
000227  **     aReadr[2] -> Elderberry
000228  **     aReadr[3] -> Currant
000229  **     aReadr[4] -> Grapefruit
000230  **     aReadr[5] -> Apple
000231  **     aReadr[6] -> Durian
000232  **     aReadr[7] -> EOF
000233  **
000234  **     aTree[] = { X, 5   0, 5    0, 3, 5, 6 }
000235  **
000236  ** The current element is "Apple" (the value of the key indicated by 
000237  ** PmaReader 5). When the Next() operation is invoked, PmaReader 5 will
000238  ** be advanced to the next key in its segment. Say the next key is
000239  ** "Eggplant":
000240  **
000241  **     aReadr[5] -> Eggplant
000242  **
000243  ** The contents of aTree[] are updated first by comparing the new PmaReader
000244  ** 5 key to the current key of PmaReader 4 (still "Grapefruit"). The PmaReader
000245  ** 5 value is still smaller, so aTree[6] is set to 5. And so on up the tree.
000246  ** The value of PmaReader 6 - "Durian" - is now smaller than that of PmaReader
000247  ** 5, so aTree[3] is set to 6. Key 0 is smaller than key 6 (Banana<Durian),
000248  ** so the value written into element 1 of the array is 0. As follows:
000249  **
000250  **     aTree[] = { X, 0   0, 6    0, 3, 5, 6 }
000251  **
000252  ** In other words, each time we advance to the next sorter element, log2(N)
000253  ** key comparison operations are required, where N is the number of segments
000254  ** being merged (rounded up to the next power of 2).
000255  */
000256  struct MergeEngine {
000257    int nTree;                 /* Used size of aTree/aReadr (power of 2) */
000258    SortSubtask *pTask;        /* Used by this thread only */
000259    int *aTree;                /* Current state of incremental merge */
000260    PmaReader *aReadr;         /* Array of PmaReaders to merge data from */
000261  };
000262  
000263  /*
000264  ** This object represents a single thread of control in a sort operation.
000265  ** Exactly VdbeSorter.nTask instances of this object are allocated
000266  ** as part of each VdbeSorter object. Instances are never allocated any
000267  ** other way. VdbeSorter.nTask is set to the number of worker threads allowed
000268  ** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).  Thus for
000269  ** single-threaded operation, there is exactly one instance of this object
000270  ** and for multi-threaded operation there are two or more instances.
000271  **
000272  ** Essentially, this structure contains all those fields of the VdbeSorter
000273  ** structure for which each thread requires a separate instance. For example,
000274  ** each thread requries its own UnpackedRecord object to unpack records in
000275  ** as part of comparison operations.
000276  **
000277  ** Before a background thread is launched, variable bDone is set to 0. Then, 
000278  ** right before it exits, the thread itself sets bDone to 1. This is used for 
000279  ** two purposes:
000280  **
000281  **   1. When flushing the contents of memory to a level-0 PMA on disk, to
000282  **      attempt to select a SortSubtask for which there is not already an
000283  **      active background thread (since doing so causes the main thread
000284  **      to block until it finishes).
000285  **
000286  **   2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
000287  **      to sqlite3ThreadJoin() is likely to block. Cases that are likely to
000288  **      block provoke debugging output.
000289  **
000290  ** In both cases, the effects of the main thread seeing (bDone==0) even
000291  ** after the thread has finished are not dire. So we don't worry about
000292  ** memory barriers and such here.
000293  */
000294  typedef int (*SorterCompare)(SortSubtask*,int*,const void*,int,const void*,int);
000295  struct SortSubtask {
000296    SQLiteThread *pThread;          /* Background thread, if any */
000297    int bDone;                      /* Set if thread is finished but not joined */
000298    VdbeSorter *pSorter;            /* Sorter that owns this sub-task */
000299    UnpackedRecord *pUnpacked;      /* Space to unpack a record */
000300    SorterList list;                /* List for thread to write to a PMA */
000301    int nPMA;                       /* Number of PMAs currently in file */
000302    SorterCompare xCompare;         /* Compare function to use */
000303    SorterFile file;                /* Temp file for level-0 PMAs */
000304    SorterFile file2;               /* Space for other PMAs */
000305  };
000306  
000307  
000308  /*
000309  ** Main sorter structure. A single instance of this is allocated for each 
000310  ** sorter cursor created by the VDBE.
000311  **
000312  ** mxKeysize:
000313  **   As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
000314  **   this variable is updated so as to be set to the size on disk of the
000315  **   largest record in the sorter.
000316  */
000317  struct VdbeSorter {
000318    int mnPmaSize;                  /* Minimum PMA size, in bytes */
000319    int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
000320    int mxKeysize;                  /* Largest serialized key seen so far */
000321    int pgsz;                       /* Main database page size */
000322    PmaReader *pReader;             /* Readr data from here after Rewind() */
000323    MergeEngine *pMerger;           /* Or here, if bUseThreads==0 */
000324    sqlite3 *db;                    /* Database connection */
000325    KeyInfo *pKeyInfo;              /* How to compare records */
000326    UnpackedRecord *pUnpacked;      /* Used by VdbeSorterCompare() */
000327    SorterList list;                /* List of in-memory records */
000328    int iMemory;                    /* Offset of free space in list.aMemory */
000329    int nMemory;                    /* Size of list.aMemory allocation in bytes */
000330    u8 bUsePMA;                     /* True if one or more PMAs created */
000331    u8 bUseThreads;                 /* True to use background threads */
000332    u8 iPrev;                       /* Previous thread used to flush PMA */
000333    u8 nTask;                       /* Size of aTask[] array */
000334    u8 typeMask;
000335    SortSubtask aTask[1];           /* One or more subtasks */
000336  };
000337  
000338  #define SORTER_TYPE_INTEGER 0x01
000339  #define SORTER_TYPE_TEXT    0x02
000340  
000341  /*
000342  ** An instance of the following object is used to read records out of a
000343  ** PMA, in sorted order.  The next key to be read is cached in nKey/aKey.
000344  ** aKey might point into aMap or into aBuffer.  If neither of those locations
000345  ** contain a contiguous representation of the key, then aAlloc is allocated
000346  ** and the key is copied into aAlloc and aKey is made to poitn to aAlloc.
000347  **
000348  ** pFd==0 at EOF.
000349  */
000350  struct PmaReader {
000351    i64 iReadOff;               /* Current read offset */
000352    i64 iEof;                   /* 1 byte past EOF for this PmaReader */
000353    int nAlloc;                 /* Bytes of space at aAlloc */
000354    int nKey;                   /* Number of bytes in key */
000355    sqlite3_file *pFd;          /* File handle we are reading from */
000356    u8 *aAlloc;                 /* Space for aKey if aBuffer and pMap wont work */
000357    u8 *aKey;                   /* Pointer to current key */
000358    u8 *aBuffer;                /* Current read buffer */
000359    int nBuffer;                /* Size of read buffer in bytes */
000360    u8 *aMap;                   /* Pointer to mapping of entire file */
000361    IncrMerger *pIncr;          /* Incremental merger */
000362  };
000363  
000364  /*
000365  ** Normally, a PmaReader object iterates through an existing PMA stored 
000366  ** within a temp file. However, if the PmaReader.pIncr variable points to
000367  ** an object of the following type, it may be used to iterate/merge through
000368  ** multiple PMAs simultaneously.
000369  **
000370  ** There are two types of IncrMerger object - single (bUseThread==0) and 
000371  ** multi-threaded (bUseThread==1). 
000372  **
000373  ** A multi-threaded IncrMerger object uses two temporary files - aFile[0] 
000374  ** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in 
000375  ** size. When the IncrMerger is initialized, it reads enough data from 
000376  ** pMerger to populate aFile[0]. It then sets variables within the 
000377  ** corresponding PmaReader object to read from that file and kicks off 
000378  ** a background thread to populate aFile[1] with the next mxSz bytes of 
000379  ** sorted record data from pMerger. 
000380  **
000381  ** When the PmaReader reaches the end of aFile[0], it blocks until the
000382  ** background thread has finished populating aFile[1]. It then exchanges
000383  ** the contents of the aFile[0] and aFile[1] variables within this structure,
000384  ** sets the PmaReader fields to read from the new aFile[0] and kicks off
000385  ** another background thread to populate the new aFile[1]. And so on, until
000386  ** the contents of pMerger are exhausted.
000387  **
000388  ** A single-threaded IncrMerger does not open any temporary files of its
000389  ** own. Instead, it has exclusive access to mxSz bytes of space beginning
000390  ** at offset iStartOff of file pTask->file2. And instead of using a 
000391  ** background thread to prepare data for the PmaReader, with a single
000392  ** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with
000393  ** keys from pMerger by the calling thread whenever the PmaReader runs out
000394  ** of data.
000395  */
000396  struct IncrMerger {
000397    SortSubtask *pTask;             /* Task that owns this merger */
000398    MergeEngine *pMerger;           /* Merge engine thread reads data from */
000399    i64 iStartOff;                  /* Offset to start writing file at */
000400    int mxSz;                       /* Maximum bytes of data to store */
000401    int bEof;                       /* Set to true when merge is finished */
000402    int bUseThread;                 /* True to use a bg thread for this object */
000403    SorterFile aFile[2];            /* aFile[0] for reading, [1] for writing */
000404  };
000405  
000406  /*
000407  ** An instance of this object is used for writing a PMA.
000408  **
000409  ** The PMA is written one record at a time.  Each record is of an arbitrary
000410  ** size.  But I/O is more efficient if it occurs in page-sized blocks where
000411  ** each block is aligned on a page boundary.  This object caches writes to
000412  ** the PMA so that aligned, page-size blocks are written.
000413  */
000414  struct PmaWriter {
000415    int eFWErr;                     /* Non-zero if in an error state */
000416    u8 *aBuffer;                    /* Pointer to write buffer */
000417    int nBuffer;                    /* Size of write buffer in bytes */
000418    int iBufStart;                  /* First byte of buffer to write */
000419    int iBufEnd;                    /* Last byte of buffer to write */
000420    i64 iWriteOff;                  /* Offset of start of buffer in file */
000421    sqlite3_file *pFd;              /* File handle to write to */
000422  };
000423  
000424  /*
000425  ** This object is the header on a single record while that record is being
000426  ** held in memory and prior to being written out as part of a PMA.
000427  **
000428  ** How the linked list is connected depends on how memory is being managed
000429  ** by this module. If using a separate allocation for each in-memory record
000430  ** (VdbeSorter.list.aMemory==0), then the list is always connected using the
000431  ** SorterRecord.u.pNext pointers.
000432  **
000433  ** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0),
000434  ** then while records are being accumulated the list is linked using the
000435  ** SorterRecord.u.iNext offset. This is because the aMemory[] array may
000436  ** be sqlite3Realloc()ed while records are being accumulated. Once the VM
000437  ** has finished passing records to the sorter, or when the in-memory buffer
000438  ** is full, the list is sorted. As part of the sorting process, it is
000439  ** converted to use the SorterRecord.u.pNext pointers. See function
000440  ** vdbeSorterSort() for details.
000441  */
000442  struct SorterRecord {
000443    int nVal;                       /* Size of the record in bytes */
000444    union {
000445      SorterRecord *pNext;          /* Pointer to next record in list */
000446      int iNext;                    /* Offset within aMemory of next record */
000447    } u;
000448    /* The data for the record immediately follows this header */
000449  };
000450  
000451  /* Return a pointer to the buffer containing the record data for SorterRecord
000452  ** object p. Should be used as if:
000453  **
000454  **   void *SRVAL(SorterRecord *p) { return (void*)&p[1]; }
000455  */
000456  #define SRVAL(p) ((void*)((SorterRecord*)(p) + 1))
000457  
000458  
000459  /* Maximum number of PMAs that a single MergeEngine can merge */
000460  #define SORTER_MAX_MERGE_COUNT 16
000461  
000462  static int vdbeIncrSwap(IncrMerger*);
000463  static void vdbeIncrFree(IncrMerger *);
000464  
000465  /*
000466  ** Free all memory belonging to the PmaReader object passed as the
000467  ** argument. All structure fields are set to zero before returning.
000468  */
000469  static void vdbePmaReaderClear(PmaReader *pReadr){
000470    sqlite3_free(pReadr->aAlloc);
000471    sqlite3_free(pReadr->aBuffer);
000472    if( pReadr->aMap ) sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap);
000473    vdbeIncrFree(pReadr->pIncr);
000474    memset(pReadr, 0, sizeof(PmaReader));
000475  }
000476  
000477  /*
000478  ** Read the next nByte bytes of data from the PMA p.
000479  ** If successful, set *ppOut to point to a buffer containing the data
000480  ** and return SQLITE_OK. Otherwise, if an error occurs, return an SQLite
000481  ** error code.
000482  **
000483  ** The buffer returned in *ppOut is only valid until the
000484  ** next call to this function.
000485  */
000486  static int vdbePmaReadBlob(
000487    PmaReader *p,                   /* PmaReader from which to take the blob */
000488    int nByte,                      /* Bytes of data to read */
000489    u8 **ppOut                      /* OUT: Pointer to buffer containing data */
000490  ){
000491    int iBuf;                       /* Offset within buffer to read from */
000492    int nAvail;                     /* Bytes of data available in buffer */
000493  
000494    if( p->aMap ){
000495      *ppOut = &p->aMap[p->iReadOff];
000496      p->iReadOff += nByte;
000497      return SQLITE_OK;
000498    }
000499  
000500    assert( p->aBuffer );
000501  
000502    /* If there is no more data to be read from the buffer, read the next 
000503    ** p->nBuffer bytes of data from the file into it. Or, if there are less
000504    ** than p->nBuffer bytes remaining in the PMA, read all remaining data.  */
000505    iBuf = p->iReadOff % p->nBuffer;
000506    if( iBuf==0 ){
000507      int nRead;                    /* Bytes to read from disk */
000508      int rc;                       /* sqlite3OsRead() return code */
000509  
000510      /* Determine how many bytes of data to read. */
000511      if( (p->iEof - p->iReadOff) > (i64)p->nBuffer ){
000512        nRead = p->nBuffer;
000513      }else{
000514        nRead = (int)(p->iEof - p->iReadOff);
000515      }
000516      assert( nRead>0 );
000517  
000518      /* Readr data from the file. Return early if an error occurs. */
000519      rc = sqlite3OsRead(p->pFd, p->aBuffer, nRead, p->iReadOff);
000520      assert( rc!=SQLITE_IOERR_SHORT_READ );
000521      if( rc!=SQLITE_OK ) return rc;
000522    }
000523    nAvail = p->nBuffer - iBuf; 
000524  
000525    if( nByte<=nAvail ){
000526      /* The requested data is available in the in-memory buffer. In this
000527      ** case there is no need to make a copy of the data, just return a 
000528      ** pointer into the buffer to the caller.  */
000529      *ppOut = &p->aBuffer[iBuf];
000530      p->iReadOff += nByte;
000531    }else{
000532      /* The requested data is not all available in the in-memory buffer.
000533      ** In this case, allocate space at p->aAlloc[] to copy the requested
000534      ** range into. Then return a copy of pointer p->aAlloc to the caller.  */
000535      int nRem;                     /* Bytes remaining to copy */
000536  
000537      /* Extend the p->aAlloc[] allocation if required. */
000538      if( p->nAlloc<nByte ){
000539        u8 *aNew;
000540        int nNew = MAX(128, p->nAlloc*2);
000541        while( nByte>nNew ) nNew = nNew*2;
000542        aNew = sqlite3Realloc(p->aAlloc, nNew);
000543        if( !aNew ) return SQLITE_NOMEM_BKPT;
000544        p->nAlloc = nNew;
000545        p->aAlloc = aNew;
000546      }
000547  
000548      /* Copy as much data as is available in the buffer into the start of
000549      ** p->aAlloc[].  */
000550      memcpy(p->aAlloc, &p->aBuffer[iBuf], nAvail);
000551      p->iReadOff += nAvail;
000552      nRem = nByte - nAvail;
000553  
000554      /* The following loop copies up to p->nBuffer bytes per iteration into
000555      ** the p->aAlloc[] buffer.  */
000556      while( nRem>0 ){
000557        int rc;                     /* vdbePmaReadBlob() return code */
000558        int nCopy;                  /* Number of bytes to copy */
000559        u8 *aNext;                  /* Pointer to buffer to copy data from */
000560  
000561        nCopy = nRem;
000562        if( nRem>p->nBuffer ) nCopy = p->nBuffer;
000563        rc = vdbePmaReadBlob(p, nCopy, &aNext);
000564        if( rc!=SQLITE_OK ) return rc;
000565        assert( aNext!=p->aAlloc );
000566        memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy);
000567        nRem -= nCopy;
000568      }
000569  
000570      *ppOut = p->aAlloc;
000571    }
000572  
000573    return SQLITE_OK;
000574  }
000575  
000576  /*
000577  ** Read a varint from the stream of data accessed by p. Set *pnOut to
000578  ** the value read.
000579  */
000580  static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){
000581    int iBuf;
000582  
000583    if( p->aMap ){
000584      p->iReadOff += sqlite3GetVarint(&p->aMap[p->iReadOff], pnOut);
000585    }else{
000586      iBuf = p->iReadOff % p->nBuffer;
000587      if( iBuf && (p->nBuffer-iBuf)>=9 ){
000588        p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut);
000589      }else{
000590        u8 aVarint[16], *a;
000591        int i = 0, rc;
000592        do{
000593          rc = vdbePmaReadBlob(p, 1, &a);
000594          if( rc ) return rc;
000595          aVarint[(i++)&0xf] = a[0];
000596        }while( (a[0]&0x80)!=0 );
000597        sqlite3GetVarint(aVarint, pnOut);
000598      }
000599    }
000600  
000601    return SQLITE_OK;
000602  }
000603  
000604  /*
000605  ** Attempt to memory map file pFile. If successful, set *pp to point to the
000606  ** new mapping and return SQLITE_OK. If the mapping is not attempted 
000607  ** (because the file is too large or the VFS layer is configured not to use
000608  ** mmap), return SQLITE_OK and set *pp to NULL.
000609  **
000610  ** Or, if an error occurs, return an SQLite error code. The final value of
000611  ** *pp is undefined in this case.
000612  */
000613  static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){
000614    int rc = SQLITE_OK;
000615    if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){
000616      sqlite3_file *pFd = pFile->pFd;
000617      if( pFd->pMethods->iVersion>=3 ){
000618        rc = sqlite3OsFetch(pFd, 0, (int)pFile->iEof, (void**)pp);
000619        testcase( rc!=SQLITE_OK );
000620      }
000621    }
000622    return rc;
000623  }
000624  
000625  /*
000626  ** Attach PmaReader pReadr to file pFile (if it is not already attached to
000627  ** that file) and seek it to offset iOff within the file.  Return SQLITE_OK 
000628  ** if successful, or an SQLite error code if an error occurs.
000629  */
000630  static int vdbePmaReaderSeek(
000631    SortSubtask *pTask,             /* Task context */
000632    PmaReader *pReadr,              /* Reader whose cursor is to be moved */
000633    SorterFile *pFile,              /* Sorter file to read from */
000634    i64 iOff                        /* Offset in pFile */
000635  ){
000636    int rc = SQLITE_OK;
000637  
000638    assert( pReadr->pIncr==0 || pReadr->pIncr->bEof==0 );
000639  
000640    if( sqlite3FaultSim(201) ) return SQLITE_IOERR_READ;
000641    if( pReadr->aMap ){
000642      sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap);
000643      pReadr->aMap = 0;
000644    }
000645    pReadr->iReadOff = iOff;
000646    pReadr->iEof = pFile->iEof;
000647    pReadr->pFd = pFile->pFd;
000648  
000649    rc = vdbeSorterMapFile(pTask, pFile, &pReadr->aMap);
000650    if( rc==SQLITE_OK && pReadr->aMap==0 ){
000651      int pgsz = pTask->pSorter->pgsz;
000652      int iBuf = pReadr->iReadOff % pgsz;
000653      if( pReadr->aBuffer==0 ){
000654        pReadr->aBuffer = (u8*)sqlite3Malloc(pgsz);
000655        if( pReadr->aBuffer==0 ) rc = SQLITE_NOMEM_BKPT;
000656        pReadr->nBuffer = pgsz;
000657      }
000658      if( rc==SQLITE_OK && iBuf ){
000659        int nRead = pgsz - iBuf;
000660        if( (pReadr->iReadOff + nRead) > pReadr->iEof ){
000661          nRead = (int)(pReadr->iEof - pReadr->iReadOff);
000662        }
000663        rc = sqlite3OsRead(
000664            pReadr->pFd, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff
000665        );
000666        testcase( rc!=SQLITE_OK );
000667      }
000668    }
000669  
000670    return rc;
000671  }
000672  
000673  /*
000674  ** Advance PmaReader pReadr to the next key in its PMA. Return SQLITE_OK if
000675  ** no error occurs, or an SQLite error code if one does.
000676  */
000677  static int vdbePmaReaderNext(PmaReader *pReadr){
000678    int rc = SQLITE_OK;             /* Return Code */
000679    u64 nRec = 0;                   /* Size of record in bytes */
000680  
000681  
000682    if( pReadr->iReadOff>=pReadr->iEof ){
000683      IncrMerger *pIncr = pReadr->pIncr;
000684      int bEof = 1;
000685      if( pIncr ){
000686        rc = vdbeIncrSwap(pIncr);
000687        if( rc==SQLITE_OK && pIncr->bEof==0 ){
000688          rc = vdbePmaReaderSeek(
000689              pIncr->pTask, pReadr, &pIncr->aFile[0], pIncr->iStartOff
000690          );
000691          bEof = 0;
000692        }
000693      }
000694  
000695      if( bEof ){
000696        /* This is an EOF condition */
000697        vdbePmaReaderClear(pReadr);
000698        testcase( rc!=SQLITE_OK );
000699        return rc;
000700      }
000701    }
000702  
000703    if( rc==SQLITE_OK ){
000704      rc = vdbePmaReadVarint(pReadr, &nRec);
000705    }
000706    if( rc==SQLITE_OK ){
000707      pReadr->nKey = (int)nRec;
000708      rc = vdbePmaReadBlob(pReadr, (int)nRec, &pReadr->aKey);
000709      testcase( rc!=SQLITE_OK );
000710    }
000711  
000712    return rc;
000713  }
000714  
000715  /*
000716  ** Initialize PmaReader pReadr to scan through the PMA stored in file pFile
000717  ** starting at offset iStart and ending at offset iEof-1. This function 
000718  ** leaves the PmaReader pointing to the first key in the PMA (or EOF if the 
000719  ** PMA is empty).
000720  **
000721  ** If the pnByte parameter is NULL, then it is assumed that the file 
000722  ** contains a single PMA, and that that PMA omits the initial length varint.
000723  */
000724  static int vdbePmaReaderInit(
000725    SortSubtask *pTask,             /* Task context */
000726    SorterFile *pFile,              /* Sorter file to read from */
000727    i64 iStart,                     /* Start offset in pFile */
000728    PmaReader *pReadr,              /* PmaReader to populate */
000729    i64 *pnByte                     /* IN/OUT: Increment this value by PMA size */
000730  ){
000731    int rc;
000732  
000733    assert( pFile->iEof>iStart );
000734    assert( pReadr->aAlloc==0 && pReadr->nAlloc==0 );
000735    assert( pReadr->aBuffer==0 );
000736    assert( pReadr->aMap==0 );
000737  
000738    rc = vdbePmaReaderSeek(pTask, pReadr, pFile, iStart);
000739    if( rc==SQLITE_OK ){
000740      u64 nByte = 0;                 /* Size of PMA in bytes */
000741      rc = vdbePmaReadVarint(pReadr, &nByte);
000742      pReadr->iEof = pReadr->iReadOff + nByte;
000743      *pnByte += nByte;
000744    }
000745  
000746    if( rc==SQLITE_OK ){
000747      rc = vdbePmaReaderNext(pReadr);
000748    }
000749    return rc;
000750  }
000751  
000752  /*
000753  ** A version of vdbeSorterCompare() that assumes that it has already been
000754  ** determined that the first field of key1 is equal to the first field of 
000755  ** key2.
000756  */
000757  static int vdbeSorterCompareTail(
000758    SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
000759    int *pbKey2Cached,              /* True if pTask->pUnpacked is pKey2 */
000760    const void *pKey1, int nKey1,   /* Left side of comparison */
000761    const void *pKey2, int nKey2    /* Right side of comparison */
000762  ){
000763    UnpackedRecord *r2 = pTask->pUnpacked;
000764    if( *pbKey2Cached==0 ){
000765      sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
000766      *pbKey2Cached = 1;
000767    }
000768    return sqlite3VdbeRecordCompareWithSkip(nKey1, pKey1, r2, 1);
000769  }
000770  
000771  /*
000772  ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2, 
000773  ** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences
000774  ** used by the comparison. Return the result of the comparison.
000775  **
000776  ** If IN/OUT parameter *pbKey2Cached is true when this function is called,
000777  ** it is assumed that (pTask->pUnpacked) contains the unpacked version
000778  ** of key2. If it is false, (pTask->pUnpacked) is populated with the unpacked
000779  ** version of key2 and *pbKey2Cached set to true before returning.
000780  **
000781  ** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set
000782  ** to SQLITE_NOMEM.
000783  */
000784  static int vdbeSorterCompare(
000785    SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
000786    int *pbKey2Cached,              /* True if pTask->pUnpacked is pKey2 */
000787    const void *pKey1, int nKey1,   /* Left side of comparison */
000788    const void *pKey2, int nKey2    /* Right side of comparison */
000789  ){
000790    UnpackedRecord *r2 = pTask->pUnpacked;
000791    if( !*pbKey2Cached ){
000792      sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
000793      *pbKey2Cached = 1;
000794    }
000795    return sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
000796  }
000797  
000798  /*
000799  ** A specially optimized version of vdbeSorterCompare() that assumes that
000800  ** the first field of each key is a TEXT value and that the collation
000801  ** sequence to compare them with is BINARY.
000802  */
000803  static int vdbeSorterCompareText(
000804    SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
000805    int *pbKey2Cached,              /* True if pTask->pUnpacked is pKey2 */
000806    const void *pKey1, int nKey1,   /* Left side of comparison */
000807    const void *pKey2, int nKey2    /* Right side of comparison */
000808  ){
000809    const u8 * const p1 = (const u8 * const)pKey1;
000810    const u8 * const p2 = (const u8 * const)pKey2;
000811    const u8 * const v1 = &p1[ p1[0] ];   /* Pointer to value 1 */
000812    const u8 * const v2 = &p2[ p2[0] ];   /* Pointer to value 2 */
000813  
000814    int n1;
000815    int n2;
000816    int res;
000817  
000818    getVarint32(&p1[1], n1); n1 = (n1 - 13) / 2;
000819    getVarint32(&p2[1], n2); n2 = (n2 - 13) / 2;
000820    res = memcmp(v1, v2, MIN(n1, n2));
000821    if( res==0 ){
000822      res = n1 - n2;
000823    }
000824  
000825    if( res==0 ){
000826      if( pTask->pSorter->pKeyInfo->nField>1 ){
000827        res = vdbeSorterCompareTail(
000828            pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2
000829        );
000830      }
000831    }else{
000832      if( pTask->pSorter->pKeyInfo->aSortOrder[0] ){
000833        res = res * -1;
000834      }
000835    }
000836  
000837    return res;
000838  }
000839  
000840  /*
000841  ** A specially optimized version of vdbeSorterCompare() that assumes that
000842  ** the first field of each key is an INTEGER value.
000843  */
000844  static int vdbeSorterCompareInt(
000845    SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
000846    int *pbKey2Cached,              /* True if pTask->pUnpacked is pKey2 */
000847    const void *pKey1, int nKey1,   /* Left side of comparison */
000848    const void *pKey2, int nKey2    /* Right side of comparison */
000849  ){
000850    const u8 * const p1 = (const u8 * const)pKey1;
000851    const u8 * const p2 = (const u8 * const)pKey2;
000852    const int s1 = p1[1];                 /* Left hand serial type */
000853    const int s2 = p2[1];                 /* Right hand serial type */
000854    const u8 * const v1 = &p1[ p1[0] ];   /* Pointer to value 1 */
000855    const u8 * const v2 = &p2[ p2[0] ];   /* Pointer to value 2 */
000856    int res;                              /* Return value */
000857  
000858    assert( (s1>0 && s1<7) || s1==8 || s1==9 );
000859    assert( (s2>0 && s2<7) || s2==8 || s2==9 );
000860  
000861    if( s1>7 && s2>7 ){
000862      res = s1 - s2;
000863    }else{
000864      if( s1==s2 ){
000865        if( (*v1 ^ *v2) & 0x80 ){
000866          /* The two values have different signs */
000867          res = (*v1 & 0x80) ? -1 : +1;
000868        }else{
000869          /* The two values have the same sign. Compare using memcmp(). */
000870          static const u8 aLen[] = {0, 1, 2, 3, 4, 6, 8 };
000871          int i;
000872          res = 0;
000873          for(i=0; i<aLen[s1]; i++){
000874            if( (res = v1[i] - v2[i]) ) break;
000875          }
000876        }
000877      }else{
000878        if( s2>7 ){
000879          res = +1;
000880        }else if( s1>7 ){
000881          res = -1;
000882        }else{
000883          res = s1 - s2;
000884        }
000885        assert( res!=0 );
000886  
000887        if( res>0 ){
000888          if( *v1 & 0x80 ) res = -1;
000889        }else{
000890          if( *v2 & 0x80 ) res = +1;
000891        }
000892      }
000893    }
000894  
000895    if( res==0 ){
000896      if( pTask->pSorter->pKeyInfo->nField>1 ){
000897        res = vdbeSorterCompareTail(
000898            pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2
000899        );
000900      }
000901    }else if( pTask->pSorter->pKeyInfo->aSortOrder[0] ){
000902      res = res * -1;
000903    }
000904  
000905    return res;
000906  }
000907  
000908  /*
000909  ** Initialize the temporary index cursor just opened as a sorter cursor.
000910  **
000911  ** Usually, the sorter module uses the value of (pCsr->pKeyInfo->nField)
000912  ** to determine the number of fields that should be compared from the
000913  ** records being sorted. However, if the value passed as argument nField
000914  ** is non-zero and the sorter is able to guarantee a stable sort, nField
000915  ** is used instead. This is used when sorting records for a CREATE INDEX
000916  ** statement. In this case, keys are always delivered to the sorter in
000917  ** order of the primary key, which happens to be make up the final part 
000918  ** of the records being sorted. So if the sort is stable, there is never
000919  ** any reason to compare PK fields and they can be ignored for a small
000920  ** performance boost.
000921  **
000922  ** The sorter can guarantee a stable sort when running in single-threaded
000923  ** mode, but not in multi-threaded mode.
000924  **
000925  ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
000926  */
000927  int sqlite3VdbeSorterInit(
000928    sqlite3 *db,                    /* Database connection (for malloc()) */
000929    int nField,                     /* Number of key fields in each record */
000930    VdbeCursor *pCsr                /* Cursor that holds the new sorter */
000931  ){
000932    int pgsz;                       /* Page size of main database */
000933    int i;                          /* Used to iterate through aTask[] */
000934    VdbeSorter *pSorter;            /* The new sorter */
000935    KeyInfo *pKeyInfo;              /* Copy of pCsr->pKeyInfo with db==0 */
000936    int szKeyInfo;                  /* Size of pCsr->pKeyInfo in bytes */
000937    int sz;                         /* Size of pSorter in bytes */
000938    int rc = SQLITE_OK;
000939  #if SQLITE_MAX_WORKER_THREADS==0
000940  # define nWorker 0
000941  #else
000942    int nWorker;
000943  #endif
000944  
000945    /* Initialize the upper limit on the number of worker threads */
000946  #if SQLITE_MAX_WORKER_THREADS>0
000947    if( sqlite3TempInMemory(db) || sqlite3GlobalConfig.bCoreMutex==0 ){
000948      nWorker = 0;
000949    }else{
000950      nWorker = db->aLimit[SQLITE_LIMIT_WORKER_THREADS];
000951    }
000952  #endif
000953  
000954    /* Do not allow the total number of threads (main thread + all workers)
000955    ** to exceed the maximum merge count */
000956  #if SQLITE_MAX_WORKER_THREADS>=SORTER_MAX_MERGE_COUNT
000957    if( nWorker>=SORTER_MAX_MERGE_COUNT ){
000958      nWorker = SORTER_MAX_MERGE_COUNT-1;
000959    }
000960  #endif
000961  
000962    assert( pCsr->pKeyInfo && pCsr->pBtx==0 );
000963    assert( pCsr->eCurType==CURTYPE_SORTER );
000964    szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*);
000965    sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask);
000966  
000967    pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo);
000968    pCsr->uc.pSorter = pSorter;
000969    if( pSorter==0 ){
000970      rc = SQLITE_NOMEM_BKPT;
000971    }else{
000972      pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
000973      memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
000974      pKeyInfo->db = 0;
000975      if( nField && nWorker==0 ){
000976        pKeyInfo->nXField += (pKeyInfo->nField - nField);
000977        pKeyInfo->nField = nField;
000978      }
000979      pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
000980      pSorter->nTask = nWorker + 1;
000981      pSorter->iPrev = (u8)(nWorker - 1);
000982      pSorter->bUseThreads = (pSorter->nTask>1);
000983      pSorter->db = db;
000984      for(i=0; i<pSorter->nTask; i++){
000985        SortSubtask *pTask = &pSorter->aTask[i];
000986        pTask->pSorter = pSorter;
000987      }
000988  
000989      if( !sqlite3TempInMemory(db) ){
000990        i64 mxCache;                /* Cache size in bytes*/
000991        u32 szPma = sqlite3GlobalConfig.szPma;
000992        pSorter->mnPmaSize = szPma * pgsz;
000993  
000994        mxCache = db->aDb[0].pSchema->cache_size;
000995        if( mxCache<0 ){
000996          /* A negative cache-size value C indicates that the cache is abs(C)
000997          ** KiB in size.  */
000998          mxCache = mxCache * -1024;
000999        }else{
001000          mxCache = mxCache * pgsz;
001001        }
001002        mxCache = MIN(mxCache, SQLITE_MAX_PMASZ);
001003        pSorter->mxPmaSize = MAX(pSorter->mnPmaSize, (int)mxCache);
001004  
001005        /* EVIDENCE-OF: R-26747-61719 When the application provides any amount of
001006        ** scratch memory using SQLITE_CONFIG_SCRATCH, SQLite avoids unnecessary
001007        ** large heap allocations.
001008        */
001009        if( sqlite3GlobalConfig.pScratch==0 ){
001010          assert( pSorter->iMemory==0 );
001011          pSorter->nMemory = pgsz;
001012          pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz);
001013          if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM_BKPT;
001014        }
001015      }
001016  
001017      if( (pKeyInfo->nField+pKeyInfo->nXField)<13 
001018       && (pKeyInfo->aColl[0]==0 || pKeyInfo->aColl[0]==db->pDfltColl)
001019      ){
001020        pSorter->typeMask = SORTER_TYPE_INTEGER | SORTER_TYPE_TEXT;
001021      }
001022    }
001023  
001024    return rc;
001025  }
001026  #undef nWorker   /* Defined at the top of this function */
001027  
001028  /*
001029  ** Free the list of sorted records starting at pRecord.
001030  */
001031  static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){
001032    SorterRecord *p;
001033    SorterRecord *pNext;
001034    for(p=pRecord; p; p=pNext){
001035      pNext = p->u.pNext;
001036      sqlite3DbFree(db, p);
001037    }
001038  }
001039  
001040  /*
001041  ** Free all resources owned by the object indicated by argument pTask. All 
001042  ** fields of *pTask are zeroed before returning.
001043  */
001044  static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){
001045    sqlite3DbFree(db, pTask->pUnpacked);
001046  #if SQLITE_MAX_WORKER_THREADS>0
001047    /* pTask->list.aMemory can only be non-zero if it was handed memory
001048    ** from the main thread.  That only occurs SQLITE_MAX_WORKER_THREADS>0 */
001049    if( pTask->list.aMemory ){
001050      sqlite3_free(pTask->list.aMemory);
001051    }else
001052  #endif
001053    {
001054      assert( pTask->list.aMemory==0 );
001055      vdbeSorterRecordFree(0, pTask->list.pList);
001056    }
001057    if( pTask->file.pFd ){
001058      sqlite3OsCloseFree(pTask->file.pFd);
001059    }
001060    if( pTask->file2.pFd ){
001061      sqlite3OsCloseFree(pTask->file2.pFd);
001062    }
001063    memset(pTask, 0, sizeof(SortSubtask));
001064  }
001065  
001066  #ifdef SQLITE_DEBUG_SORTER_THREADS
001067  static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){
001068    i64 t;
001069    int iTask = (pTask - pTask->pSorter->aTask);
001070    sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
001071    fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
001072  }
001073  static void vdbeSorterRewindDebug(const char *zEvent){
001074    i64 t;
001075    sqlite3OsCurrentTimeInt64(sqlite3_vfs_find(0), &t);
001076    fprintf(stderr, "%lld:X %s\n", t, zEvent);
001077  }
001078  static void vdbeSorterPopulateDebug(
001079    SortSubtask *pTask,
001080    const char *zEvent
001081  ){
001082    i64 t;
001083    int iTask = (pTask - pTask->pSorter->aTask);
001084    sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
001085    fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
001086  }
001087  static void vdbeSorterBlockDebug(
001088    SortSubtask *pTask,
001089    int bBlocked,
001090    const char *zEvent
001091  ){
001092    if( bBlocked ){
001093      i64 t;
001094      sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
001095      fprintf(stderr, "%lld:main %s\n", t, zEvent);
001096    }
001097  }
001098  #else
001099  # define vdbeSorterWorkDebug(x,y)
001100  # define vdbeSorterRewindDebug(y)
001101  # define vdbeSorterPopulateDebug(x,y)
001102  # define vdbeSorterBlockDebug(x,y,z)
001103  #endif
001104  
001105  #if SQLITE_MAX_WORKER_THREADS>0
001106  /*
001107  ** Join thread pTask->thread.
001108  */
001109  static int vdbeSorterJoinThread(SortSubtask *pTask){
001110    int rc = SQLITE_OK;
001111    if( pTask->pThread ){
001112  #ifdef SQLITE_DEBUG_SORTER_THREADS
001113      int bDone = pTask->bDone;
001114  #endif
001115      void *pRet = SQLITE_INT_TO_PTR(SQLITE_ERROR);
001116      vdbeSorterBlockDebug(pTask, !bDone, "enter");
001117      (void)sqlite3ThreadJoin(pTask->pThread, &pRet);
001118      vdbeSorterBlockDebug(pTask, !bDone, "exit");
001119      rc = SQLITE_PTR_TO_INT(pRet);
001120      assert( pTask->bDone==1 );
001121      pTask->bDone = 0;
001122      pTask->pThread = 0;
001123    }
001124    return rc;
001125  }
001126  
001127  /*
001128  ** Launch a background thread to run xTask(pIn).
001129  */
001130  static int vdbeSorterCreateThread(
001131    SortSubtask *pTask,             /* Thread will use this task object */
001132    void *(*xTask)(void*),          /* Routine to run in a separate thread */
001133    void *pIn                       /* Argument passed into xTask() */
001134  ){
001135    assert( pTask->pThread==0 && pTask->bDone==0 );
001136    return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn);
001137  }
001138  
001139  /*
001140  ** Join all outstanding threads launched by SorterWrite() to create 
001141  ** level-0 PMAs.
001142  */
001143  static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
001144    int rc = rcin;
001145    int i;
001146  
001147    /* This function is always called by the main user thread.
001148    **
001149    ** If this function is being called after SorterRewind() has been called, 
001150    ** it is possible that thread pSorter->aTask[pSorter->nTask-1].pThread
001151    ** is currently attempt to join one of the other threads. To avoid a race
001152    ** condition where this thread also attempts to join the same object, join 
001153    ** thread pSorter->aTask[pSorter->nTask-1].pThread first. */
001154    for(i=pSorter->nTask-1; i>=0; i--){
001155      SortSubtask *pTask = &pSorter->aTask[i];
001156      int rc2 = vdbeSorterJoinThread(pTask);
001157      if( rc==SQLITE_OK ) rc = rc2;
001158    }
001159    return rc;
001160  }
001161  #else
001162  # define vdbeSorterJoinAll(x,rcin) (rcin)
001163  # define vdbeSorterJoinThread(pTask) SQLITE_OK
001164  #endif
001165  
001166  /*
001167  ** Allocate a new MergeEngine object capable of handling up to
001168  ** nReader PmaReader inputs.
001169  **
001170  ** nReader is automatically rounded up to the next power of two.
001171  ** nReader may not exceed SORTER_MAX_MERGE_COUNT even after rounding up.
001172  */
001173  static MergeEngine *vdbeMergeEngineNew(int nReader){
001174    int N = 2;                      /* Smallest power of two >= nReader */
001175    int nByte;                      /* Total bytes of space to allocate */
001176    MergeEngine *pNew;              /* Pointer to allocated object to return */
001177  
001178    assert( nReader<=SORTER_MAX_MERGE_COUNT );
001179  
001180    while( N<nReader ) N += N;
001181    nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));
001182  
001183    pNew = sqlite3FaultSim(100) ? 0 : (MergeEngine*)sqlite3MallocZero(nByte);
001184    if( pNew ){
001185      pNew->nTree = N;
001186      pNew->pTask = 0;
001187      pNew->aReadr = (PmaReader*)&pNew[1];
001188      pNew->aTree = (int*)&pNew->aReadr[N];
001189    }
001190    return pNew;
001191  }
001192  
001193  /*
001194  ** Free the MergeEngine object passed as the only argument.
001195  */
001196  static void vdbeMergeEngineFree(MergeEngine *pMerger){
001197    int i;
001198    if( pMerger ){
001199      for(i=0; i<pMerger->nTree; i++){
001200        vdbePmaReaderClear(&pMerger->aReadr[i]);
001201      }
001202    }
001203    sqlite3_free(pMerger);
001204  }
001205  
001206  /*
001207  ** Free all resources associated with the IncrMerger object indicated by
001208  ** the first argument.
001209  */
001210  static void vdbeIncrFree(IncrMerger *pIncr){
001211    if( pIncr ){
001212  #if SQLITE_MAX_WORKER_THREADS>0
001213      if( pIncr->bUseThread ){
001214        vdbeSorterJoinThread(pIncr->pTask);
001215        if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
001216        if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
001217      }
001218  #endif
001219      vdbeMergeEngineFree(pIncr->pMerger);
001220      sqlite3_free(pIncr);
001221    }
001222  }
001223  
001224  /*
001225  ** Reset a sorting cursor back to its original empty state.
001226  */
001227  void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
001228    int i;
001229    (void)vdbeSorterJoinAll(pSorter, SQLITE_OK);
001230    assert( pSorter->bUseThreads || pSorter->pReader==0 );
001231  #if SQLITE_MAX_WORKER_THREADS>0
001232    if( pSorter->pReader ){
001233      vdbePmaReaderClear(pSorter->pReader);
001234      sqlite3DbFree(db, pSorter->pReader);
001235      pSorter->pReader = 0;
001236    }
001237  #endif
001238    vdbeMergeEngineFree(pSorter->pMerger);
001239    pSorter->pMerger = 0;
001240    for(i=0; i<pSorter->nTask; i++){
001241      SortSubtask *pTask = &pSorter->aTask[i];
001242      vdbeSortSubtaskCleanup(db, pTask);
001243      pTask->pSorter = pSorter;
001244    }
001245    if( pSorter->list.aMemory==0 ){
001246      vdbeSorterRecordFree(0, pSorter->list.pList);
001247    }
001248    pSorter->list.pList = 0;
001249    pSorter->list.szPMA = 0;
001250    pSorter->bUsePMA = 0;
001251    pSorter->iMemory = 0;
001252    pSorter->mxKeysize = 0;
001253    sqlite3DbFree(db, pSorter->pUnpacked);
001254    pSorter->pUnpacked = 0;
001255  }
001256  
001257  /*
001258  ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
001259  */
001260  void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
001261    VdbeSorter *pSorter;
001262    assert( pCsr->eCurType==CURTYPE_SORTER );
001263    pSorter = pCsr->uc.pSorter;
001264    if( pSorter ){
001265      sqlite3VdbeSorterReset(db, pSorter);
001266      sqlite3_free(pSorter->list.aMemory);
001267      sqlite3DbFree(db, pSorter);
001268      pCsr->uc.pSorter = 0;
001269    }
001270  }
001271  
001272  #if SQLITE_MAX_MMAP_SIZE>0
001273  /*
001274  ** The first argument is a file-handle open on a temporary file. The file
001275  ** is guaranteed to be nByte bytes or smaller in size. This function
001276  ** attempts to extend the file to nByte bytes in size and to ensure that
001277  ** the VFS has memory mapped it.
001278  **
001279  ** Whether or not the file does end up memory mapped of course depends on
001280  ** the specific VFS implementation.
001281  */
001282  static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFd, i64 nByte){
001283    if( nByte<=(i64)(db->nMaxSorterMmap) && pFd->pMethods->iVersion>=3 ){
001284      void *p = 0;
001285      int chunksize = 4*1024;
001286      sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_CHUNK_SIZE, &chunksize);
001287      sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_SIZE_HINT, &nByte);
001288      sqlite3OsFetch(pFd, 0, (int)nByte, &p);
001289      sqlite3OsUnfetch(pFd, 0, p);
001290    }
001291  }
001292  #else
001293  # define vdbeSorterExtendFile(x,y,z)
001294  #endif
001295  
001296  /*
001297  ** Allocate space for a file-handle and open a temporary file. If successful,
001298  ** set *ppFd to point to the malloc'd file-handle and return SQLITE_OK.
001299  ** Otherwise, set *ppFd to 0 and return an SQLite error code.
001300  */
001301  static int vdbeSorterOpenTempFile(
001302    sqlite3 *db,                    /* Database handle doing sort */
001303    i64 nExtend,                    /* Attempt to extend file to this size */
001304    sqlite3_file **ppFd
001305  ){
001306    int rc;
001307    if( sqlite3FaultSim(202) ) return SQLITE_IOERR_ACCESS;
001308    rc = sqlite3OsOpenMalloc(db->pVfs, 0, ppFd,
001309        SQLITE_OPEN_TEMP_JOURNAL |
001310        SQLITE_OPEN_READWRITE    | SQLITE_OPEN_CREATE |
001311        SQLITE_OPEN_EXCLUSIVE    | SQLITE_OPEN_DELETEONCLOSE, &rc
001312    );
001313    if( rc==SQLITE_OK ){
001314      i64 max = SQLITE_MAX_MMAP_SIZE;
001315      sqlite3OsFileControlHint(*ppFd, SQLITE_FCNTL_MMAP_SIZE, (void*)&max);
001316      if( nExtend>0 ){
001317        vdbeSorterExtendFile(db, *ppFd, nExtend);
001318      }
001319    }
001320    return rc;
001321  }
001322  
001323  /*
001324  ** If it has not already been allocated, allocate the UnpackedRecord 
001325  ** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or 
001326  ** if no allocation was required), or SQLITE_NOMEM otherwise.
001327  */
001328  static int vdbeSortAllocUnpacked(SortSubtask *pTask){
001329    if( pTask->pUnpacked==0 ){
001330      pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pTask->pSorter->pKeyInfo);
001331      if( pTask->pUnpacked==0 ) return SQLITE_NOMEM_BKPT;
001332      pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nField;
001333      pTask->pUnpacked->errCode = 0;
001334    }
001335    return SQLITE_OK;
001336  }
001337  
001338  
001339  /*
001340  ** Merge the two sorted lists p1 and p2 into a single list.
001341  */
001342  static SorterRecord *vdbeSorterMerge(
001343    SortSubtask *pTask,             /* Calling thread context */
001344    SorterRecord *p1,               /* First list to merge */
001345    SorterRecord *p2                /* Second list to merge */
001346  ){
001347    SorterRecord *pFinal = 0;
001348    SorterRecord **pp = &pFinal;
001349    int bCached = 0;
001350  
001351    assert( p1!=0 && p2!=0 );
001352    for(;;){
001353      int res;
001354      res = pTask->xCompare(
001355          pTask, &bCached, SRVAL(p1), p1->nVal, SRVAL(p2), p2->nVal
001356      );
001357  
001358      if( res<=0 ){
001359        *pp = p1;
001360        pp = &p1->u.pNext;
001361        p1 = p1->u.pNext;
001362        if( p1==0 ){
001363          *pp = p2;
001364          break;
001365        }
001366      }else{
001367        *pp = p2;
001368        pp = &p2->u.pNext;
001369        p2 = p2->u.pNext;
001370        bCached = 0;
001371        if( p2==0 ){
001372          *pp = p1;
001373          break;
001374        }
001375      }
001376    }
001377    return pFinal;
001378  }
001379  
001380  /*
001381  ** Return the SorterCompare function to compare values collected by the
001382  ** sorter object passed as the only argument.
001383  */
001384  static SorterCompare vdbeSorterGetCompare(VdbeSorter *p){
001385    if( p->typeMask==SORTER_TYPE_INTEGER ){
001386      return vdbeSorterCompareInt;
001387    }else if( p->typeMask==SORTER_TYPE_TEXT ){
001388      return vdbeSorterCompareText; 
001389    }
001390    return vdbeSorterCompare;
001391  }
001392  
001393  /*
001394  ** Sort the linked list of records headed at pTask->pList. Return 
001395  ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if 
001396  ** an error occurs.
001397  */
001398  static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){
001399    int i;
001400    SorterRecord **aSlot;
001401    SorterRecord *p;
001402    int rc;
001403  
001404    rc = vdbeSortAllocUnpacked(pTask);
001405    if( rc!=SQLITE_OK ) return rc;
001406  
001407    p = pList->pList;
001408    pTask->xCompare = vdbeSorterGetCompare(pTask->pSorter);
001409  
001410    aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
001411    if( !aSlot ){
001412      return SQLITE_NOMEM_BKPT;
001413    }
001414  
001415    while( p ){
001416      SorterRecord *pNext;
001417      if( pList->aMemory ){
001418        if( (u8*)p==pList->aMemory ){
001419          pNext = 0;
001420        }else{
001421          assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) );
001422          pNext = (SorterRecord*)&pList->aMemory[p->u.iNext];
001423        }
001424      }else{
001425        pNext = p->u.pNext;
001426      }
001427  
001428      p->u.pNext = 0;
001429      for(i=0; aSlot[i]; i++){
001430        p = vdbeSorterMerge(pTask, p, aSlot[i]);
001431        aSlot[i] = 0;
001432      }
001433      aSlot[i] = p;
001434      p = pNext;
001435    }
001436  
001437    p = 0;
001438    for(i=0; i<64; i++){
001439      if( aSlot[i]==0 ) continue;
001440      p = p ? vdbeSorterMerge(pTask, p, aSlot[i]) : aSlot[i];
001441    }
001442    pList->pList = p;
001443  
001444    sqlite3_free(aSlot);
001445    assert( pTask->pUnpacked->errCode==SQLITE_OK 
001446         || pTask->pUnpacked->errCode==SQLITE_NOMEM 
001447    );
001448    return pTask->pUnpacked->errCode;
001449  }
001450  
001451  /*
001452  ** Initialize a PMA-writer object.
001453  */
001454  static void vdbePmaWriterInit(
001455    sqlite3_file *pFd,              /* File handle to write to */
001456    PmaWriter *p,                   /* Object to populate */
001457    int nBuf,                       /* Buffer size */
001458    i64 iStart                      /* Offset of pFd to begin writing at */
001459  ){
001460    memset(p, 0, sizeof(PmaWriter));
001461    p->aBuffer = (u8*)sqlite3Malloc(nBuf);
001462    if( !p->aBuffer ){
001463      p->eFWErr = SQLITE_NOMEM_BKPT;
001464    }else{
001465      p->iBufEnd = p->iBufStart = (iStart % nBuf);
001466      p->iWriteOff = iStart - p->iBufStart;
001467      p->nBuffer = nBuf;
001468      p->pFd = pFd;
001469    }
001470  }
001471  
001472  /*
001473  ** Write nData bytes of data to the PMA. Return SQLITE_OK
001474  ** if successful, or an SQLite error code if an error occurs.
001475  */
001476  static void vdbePmaWriteBlob(PmaWriter *p, u8 *pData, int nData){
001477    int nRem = nData;
001478    while( nRem>0 && p->eFWErr==0 ){
001479      int nCopy = nRem;
001480      if( nCopy>(p->nBuffer - p->iBufEnd) ){
001481        nCopy = p->nBuffer - p->iBufEnd;
001482      }
001483  
001484      memcpy(&p->aBuffer[p->iBufEnd], &pData[nData-nRem], nCopy);
001485      p->iBufEnd += nCopy;
001486      if( p->iBufEnd==p->nBuffer ){
001487        p->eFWErr = sqlite3OsWrite(p->pFd, 
001488            &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart, 
001489            p->iWriteOff + p->iBufStart
001490        );
001491        p->iBufStart = p->iBufEnd = 0;
001492        p->iWriteOff += p->nBuffer;
001493      }
001494      assert( p->iBufEnd<p->nBuffer );
001495  
001496      nRem -= nCopy;
001497    }
001498  }
001499  
001500  /*
001501  ** Flush any buffered data to disk and clean up the PMA-writer object.
001502  ** The results of using the PMA-writer after this call are undefined.
001503  ** Return SQLITE_OK if flushing the buffered data succeeds or is not 
001504  ** required. Otherwise, return an SQLite error code.
001505  **
001506  ** Before returning, set *piEof to the offset immediately following the
001507  ** last byte written to the file.
001508  */
001509  static int vdbePmaWriterFinish(PmaWriter *p, i64 *piEof){
001510    int rc;
001511    if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){
001512      p->eFWErr = sqlite3OsWrite(p->pFd, 
001513          &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart, 
001514          p->iWriteOff + p->iBufStart
001515      );
001516    }
001517    *piEof = (p->iWriteOff + p->iBufEnd);
001518    sqlite3_free(p->aBuffer);
001519    rc = p->eFWErr;
001520    memset(p, 0, sizeof(PmaWriter));
001521    return rc;
001522  }
001523  
001524  /*
001525  ** Write value iVal encoded as a varint to the PMA. Return 
001526  ** SQLITE_OK if successful, or an SQLite error code if an error occurs.
001527  */
001528  static void vdbePmaWriteVarint(PmaWriter *p, u64 iVal){
001529    int nByte; 
001530    u8 aByte[10];
001531    nByte = sqlite3PutVarint(aByte, iVal);
001532    vdbePmaWriteBlob(p, aByte, nByte);
001533  }
001534  
001535  /*
001536  ** Write the current contents of in-memory linked-list pList to a level-0
001537  ** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if 
001538  ** successful, or an SQLite error code otherwise.
001539  **
001540  ** The format of a PMA is:
001541  **
001542  **     * A varint. This varint contains the total number of bytes of content
001543  **       in the PMA (not including the varint itself).
001544  **
001545  **     * One or more records packed end-to-end in order of ascending keys. 
001546  **       Each record consists of a varint followed by a blob of data (the 
001547  **       key). The varint is the number of bytes in the blob of data.
001548  */
001549  static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
001550    sqlite3 *db = pTask->pSorter->db;
001551    int rc = SQLITE_OK;             /* Return code */
001552    PmaWriter writer;               /* Object used to write to the file */
001553  
001554  #ifdef SQLITE_DEBUG
001555    /* Set iSz to the expected size of file pTask->file after writing the PMA. 
001556    ** This is used by an assert() statement at the end of this function.  */
001557    i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof;
001558  #endif
001559  
001560    vdbeSorterWorkDebug(pTask, "enter");
001561    memset(&writer, 0, sizeof(PmaWriter));
001562    assert( pList->szPMA>0 );
001563  
001564    /* If the first temporary PMA file has not been opened, open it now. */
001565    if( pTask->file.pFd==0 ){
001566      rc = vdbeSorterOpenTempFile(db, 0, &pTask->file.pFd);
001567      assert( rc!=SQLITE_OK || pTask->file.pFd );
001568      assert( pTask->file.iEof==0 );
001569      assert( pTask->nPMA==0 );
001570    }
001571  
001572    /* Try to get the file to memory map */
001573    if( rc==SQLITE_OK ){
001574      vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9);
001575    }
001576  
001577    /* Sort the list */
001578    if( rc==SQLITE_OK ){
001579      rc = vdbeSorterSort(pTask, pList);
001580    }
001581  
001582    if( rc==SQLITE_OK ){
001583      SorterRecord *p;
001584      SorterRecord *pNext = 0;
001585  
001586      vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz,
001587                        pTask->file.iEof);
001588      pTask->nPMA++;
001589      vdbePmaWriteVarint(&writer, pList->szPMA);
001590      for(p=pList->pList; p; p=pNext){
001591        pNext = p->u.pNext;
001592        vdbePmaWriteVarint(&writer, p->nVal);
001593        vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
001594        if( pList->aMemory==0 ) sqlite3_free(p);
001595      }
001596      pList->pList = p;
001597      rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof);
001598    }
001599  
001600    vdbeSorterWorkDebug(pTask, "exit");
001601    assert( rc!=SQLITE_OK || pList->pList==0 );
001602    assert( rc!=SQLITE_OK || pTask->file.iEof==iSz );
001603    return rc;
001604  }
001605  
001606  /*
001607  ** Advance the MergeEngine to its next entry.
001608  ** Set *pbEof to true there is no next entry because
001609  ** the MergeEngine has reached the end of all its inputs.
001610  **
001611  ** Return SQLITE_OK if successful or an error code if an error occurs.
001612  */
001613  static int vdbeMergeEngineStep(
001614    MergeEngine *pMerger,      /* The merge engine to advance to the next row */
001615    int *pbEof                 /* Set TRUE at EOF.  Set false for more content */
001616  ){
001617    int rc;
001618    int iPrev = pMerger->aTree[1];/* Index of PmaReader to advance */
001619    SortSubtask *pTask = pMerger->pTask;
001620  
001621    /* Advance the current PmaReader */
001622    rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]);
001623  
001624    /* Update contents of aTree[] */
001625    if( rc==SQLITE_OK ){
001626      int i;                      /* Index of aTree[] to recalculate */
001627      PmaReader *pReadr1;         /* First PmaReader to compare */
001628      PmaReader *pReadr2;         /* Second PmaReader to compare */
001629      int bCached = 0;
001630  
001631      /* Find the first two PmaReaders to compare. The one that was just
001632      ** advanced (iPrev) and the one next to it in the array.  */
001633      pReadr1 = &pMerger->aReadr[(iPrev & 0xFFFE)];
001634      pReadr2 = &pMerger->aReadr[(iPrev | 0x0001)];
001635  
001636      for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){
001637        /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */
001638        int iRes;
001639        if( pReadr1->pFd==0 ){
001640          iRes = +1;
001641        }else if( pReadr2->pFd==0 ){
001642          iRes = -1;
001643        }else{
001644          iRes = pTask->xCompare(pTask, &bCached,
001645              pReadr1->aKey, pReadr1->nKey, pReadr2->aKey, pReadr2->nKey
001646          );
001647        }
001648  
001649        /* If pReadr1 contained the smaller value, set aTree[i] to its index.
001650        ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this
001651        ** case there is no cache of pReadr2 in pTask->pUnpacked, so set
001652        ** pKey2 to point to the record belonging to pReadr2.
001653        **
001654        ** Alternatively, if pReadr2 contains the smaller of the two values,
001655        ** set aTree[i] to its index and update pReadr1. If vdbeSorterCompare()
001656        ** was actually called above, then pTask->pUnpacked now contains
001657        ** a value equivalent to pReadr2. So set pKey2 to NULL to prevent
001658        ** vdbeSorterCompare() from decoding pReadr2 again.
001659        **
001660        ** If the two values were equal, then the value from the oldest
001661        ** PMA should be considered smaller. The VdbeSorter.aReadr[] array
001662        ** is sorted from oldest to newest, so pReadr1 contains older values
001663        ** than pReadr2 iff (pReadr1<pReadr2).  */
001664        if( iRes<0 || (iRes==0 && pReadr1<pReadr2) ){
001665          pMerger->aTree[i] = (int)(pReadr1 - pMerger->aReadr);
001666          pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
001667          bCached = 0;
001668        }else{
001669          if( pReadr1->pFd ) bCached = 0;
001670          pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr);
001671          pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
001672        }
001673      }
001674      *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFd==0);
001675    }
001676  
001677    return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc);
001678  }
001679  
001680  #if SQLITE_MAX_WORKER_THREADS>0
001681  /*
001682  ** The main routine for background threads that write level-0 PMAs.
001683  */
001684  static void *vdbeSorterFlushThread(void *pCtx){
001685    SortSubtask *pTask = (SortSubtask*)pCtx;
001686    int rc;                         /* Return code */
001687    assert( pTask->bDone==0 );
001688    rc = vdbeSorterListToPMA(pTask, &pTask->list);
001689    pTask->bDone = 1;
001690    return SQLITE_INT_TO_PTR(rc);
001691  }
001692  #endif /* SQLITE_MAX_WORKER_THREADS>0 */
001693  
001694  /*
001695  ** Flush the current contents of VdbeSorter.list to a new PMA, possibly
001696  ** using a background thread.
001697  */
001698  static int vdbeSorterFlushPMA(VdbeSorter *pSorter){
001699  #if SQLITE_MAX_WORKER_THREADS==0
001700    pSorter->bUsePMA = 1;
001701    return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list);
001702  #else
001703    int rc = SQLITE_OK;
001704    int i;
001705    SortSubtask *pTask = 0;    /* Thread context used to create new PMA */
001706    int nWorker = (pSorter->nTask-1);
001707  
001708    /* Set the flag to indicate that at least one PMA has been written. 
001709    ** Or will be, anyhow.  */
001710    pSorter->bUsePMA = 1;
001711  
001712    /* Select a sub-task to sort and flush the current list of in-memory
001713    ** records to disk. If the sorter is running in multi-threaded mode,
001714    ** round-robin between the first (pSorter->nTask-1) tasks. Except, if
001715    ** the background thread from a sub-tasks previous turn is still running,
001716    ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy,
001717    ** fall back to using the final sub-task. The first (pSorter->nTask-1)
001718    ** sub-tasks are prefered as they use background threads - the final 
001719    ** sub-task uses the main thread. */
001720    for(i=0; i<nWorker; i++){
001721      int iTest = (pSorter->iPrev + i + 1) % nWorker;
001722      pTask = &pSorter->aTask[iTest];
001723      if( pTask->bDone ){
001724        rc = vdbeSorterJoinThread(pTask);
001725      }
001726      if( rc!=SQLITE_OK || pTask->pThread==0 ) break;
001727    }
001728  
001729    if( rc==SQLITE_OK ){
001730      if( i==nWorker ){
001731        /* Use the foreground thread for this operation */
001732        rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list);
001733      }else{
001734        /* Launch a background thread for this operation */
001735        u8 *aMem = pTask->list.aMemory;
001736        void *pCtx = (void*)pTask;
001737  
001738        assert( pTask->pThread==0 && pTask->bDone==0 );
001739        assert( pTask->list.pList==0 );
001740        assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );
001741  
001742        pSorter->iPrev = (u8)(pTask - pSorter->aTask);
001743        pTask->list = pSorter->list;
001744        pSorter->list.pList = 0;
001745        pSorter->list.szPMA = 0;
001746        if( aMem ){
001747          pSorter->list.aMemory = aMem;
001748          pSorter->nMemory = sqlite3MallocSize(aMem);
001749        }else if( pSorter->list.aMemory ){
001750          pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory);
001751          if( !pSorter->list.aMemory ) return SQLITE_NOMEM_BKPT;
001752        }
001753  
001754        rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx);
001755      }
001756    }
001757  
001758    return rc;
001759  #endif /* SQLITE_MAX_WORKER_THREADS!=0 */
001760  }
001761  
001762  /*
001763  ** Add a record to the sorter.
001764  */
001765  int sqlite3VdbeSorterWrite(
001766    const VdbeCursor *pCsr,         /* Sorter cursor */
001767    Mem *pVal                       /* Memory cell containing record */
001768  ){
001769    VdbeSorter *pSorter;
001770    int rc = SQLITE_OK;             /* Return Code */
001771    SorterRecord *pNew;             /* New list element */
001772    int bFlush;                     /* True to flush contents of memory to PMA */
001773    int nReq;                       /* Bytes of memory required */
001774    int nPMA;                       /* Bytes of PMA space required */
001775    int t;                          /* serial type of first record field */
001776  
001777    assert( pCsr->eCurType==CURTYPE_SORTER );
001778    pSorter = pCsr->uc.pSorter;
001779    getVarint32((const u8*)&pVal->z[1], t);
001780    if( t>0 && t<10 && t!=7 ){
001781      pSorter->typeMask &= SORTER_TYPE_INTEGER;
001782    }else if( t>10 && (t & 0x01) ){
001783      pSorter->typeMask &= SORTER_TYPE_TEXT;
001784    }else{
001785      pSorter->typeMask = 0;
001786    }
001787  
001788    assert( pSorter );
001789  
001790    /* Figure out whether or not the current contents of memory should be
001791    ** flushed to a PMA before continuing. If so, do so.
001792    **
001793    ** If using the single large allocation mode (pSorter->aMemory!=0), then
001794    ** flush the contents of memory to a new PMA if (a) at least one value is
001795    ** already in memory and (b) the new value will not fit in memory.
001796    ** 
001797    ** Or, if using separate allocations for each record, flush the contents
001798    ** of memory to a PMA if either of the following are true:
001799    **
001800    **   * The total memory allocated for the in-memory list is greater 
001801    **     than (page-size * cache-size), or
001802    **
001803    **   * The total memory allocated for the in-memory list is greater 
001804    **     than (page-size * 10) and sqlite3HeapNearlyFull() returns true.
001805    */
001806    nReq = pVal->n + sizeof(SorterRecord);
001807    nPMA = pVal->n + sqlite3VarintLen(pVal->n);
001808    if( pSorter->mxPmaSize ){
001809      if( pSorter->list.aMemory ){
001810        bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize;
001811      }else{
001812        bFlush = (
001813            (pSorter->list.szPMA > pSorter->mxPmaSize)
001814         || (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
001815        );
001816      }
001817      if( bFlush ){
001818        rc = vdbeSorterFlushPMA(pSorter);
001819        pSorter->list.szPMA = 0;
001820        pSorter->iMemory = 0;
001821        assert( rc!=SQLITE_OK || pSorter->list.pList==0 );
001822      }
001823    }
001824  
001825    pSorter->list.szPMA += nPMA;
001826    if( nPMA>pSorter->mxKeysize ){
001827      pSorter->mxKeysize = nPMA;
001828    }
001829  
001830    if( pSorter->list.aMemory ){
001831      int nMin = pSorter->iMemory + nReq;
001832  
001833      if( nMin>pSorter->nMemory ){
001834        u8 *aNew;
001835        int iListOff = (u8*)pSorter->list.pList - pSorter->list.aMemory;
001836        int nNew = pSorter->nMemory * 2;
001837        while( nNew < nMin ) nNew = nNew*2;
001838        if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize;
001839        if( nNew < nMin ) nNew = nMin;
001840  
001841        aNew = sqlite3Realloc(pSorter->list.aMemory, nNew);
001842        if( !aNew ) return SQLITE_NOMEM_BKPT;
001843        pSorter->list.pList = (SorterRecord*)&aNew[iListOff];
001844        pSorter->list.aMemory = aNew;
001845        pSorter->nMemory = nNew;
001846      }
001847  
001848      pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory];
001849      pSorter->iMemory += ROUND8(nReq);
001850      if( pSorter->list.pList ){
001851        pNew->u.iNext = (int)((u8*)(pSorter->list.pList) - pSorter->list.aMemory);
001852      }
001853    }else{
001854      pNew = (SorterRecord *)sqlite3Malloc(nReq);
001855      if( pNew==0 ){
001856        return SQLITE_NOMEM_BKPT;
001857      }
001858      pNew->u.pNext = pSorter->list.pList;
001859    }
001860  
001861    memcpy(SRVAL(pNew), pVal->z, pVal->n);
001862    pNew->nVal = pVal->n;
001863    pSorter->list.pList = pNew;
001864  
001865    return rc;
001866  }
001867  
001868  /*
001869  ** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
001870  ** of the data stored in aFile[1] is the same as that used by regular PMAs,
001871  ** except that the number-of-bytes varint is omitted from the start.
001872  */
001873  static int vdbeIncrPopulate(IncrMerger *pIncr){
001874    int rc = SQLITE_OK;
001875    int rc2;
001876    i64 iStart = pIncr->iStartOff;
001877    SorterFile *pOut = &pIncr->aFile[1];
001878    SortSubtask *pTask = pIncr->pTask;
001879    MergeEngine *pMerger = pIncr->pMerger;
001880    PmaWriter writer;
001881    assert( pIncr->bEof==0 );
001882  
001883    vdbeSorterPopulateDebug(pTask, "enter");
001884  
001885    vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart);
001886    while( rc==SQLITE_OK ){
001887      int dummy;
001888      PmaReader *pReader = &pMerger->aReadr[ pMerger->aTree[1] ];
001889      int nKey = pReader->nKey;
001890      i64 iEof = writer.iWriteOff + writer.iBufEnd;
001891  
001892      /* Check if the output file is full or if the input has been exhausted.
001893      ** In either case exit the loop. */
001894      if( pReader->pFd==0 ) break;
001895      if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;
001896  
001897      /* Write the next key to the output. */
001898      vdbePmaWriteVarint(&writer, nKey);
001899      vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
001900      assert( pIncr->pMerger->pTask==pTask );
001901      rc = vdbeMergeEngineStep(pIncr->pMerger, &dummy);
001902    }
001903  
001904    rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
001905    if( rc==SQLITE_OK ) rc = rc2;
001906    vdbeSorterPopulateDebug(pTask, "exit");
001907    return rc;
001908  }
001909  
001910  #if SQLITE_MAX_WORKER_THREADS>0
001911  /*
001912  ** The main routine for background threads that populate aFile[1] of
001913  ** multi-threaded IncrMerger objects.
001914  */
001915  static void *vdbeIncrPopulateThread(void *pCtx){
001916    IncrMerger *pIncr = (IncrMerger*)pCtx;
001917    void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
001918    pIncr->pTask->bDone = 1;
001919    return pRet;
001920  }
001921  
001922  /*
001923  ** Launch a background thread to populate aFile[1] of pIncr.
001924  */
001925  static int vdbeIncrBgPopulate(IncrMerger *pIncr){
001926    void *p = (void*)pIncr;
001927    assert( pIncr->bUseThread );
001928    return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p);
001929  }
001930  #endif
001931  
001932  /*
001933  ** This function is called when the PmaReader corresponding to pIncr has
001934  ** finished reading the contents of aFile[0]. Its purpose is to "refill"
001935  ** aFile[0] such that the PmaReader should start rereading it from the
001936  ** beginning.
001937  **
001938  ** For single-threaded objects, this is accomplished by literally reading 
001939  ** keys from pIncr->pMerger and repopulating aFile[0]. 
001940  **
001941  ** For multi-threaded objects, all that is required is to wait until the 
001942  ** background thread is finished (if it is not already) and then swap 
001943  ** aFile[0] and aFile[1] in place. If the contents of pMerger have not
001944  ** been exhausted, this function also launches a new background thread
001945  ** to populate the new aFile[1].
001946  **
001947  ** SQLITE_OK is returned on success, or an SQLite error code otherwise.
001948  */
001949  static int vdbeIncrSwap(IncrMerger *pIncr){
001950    int rc = SQLITE_OK;
001951  
001952  #if SQLITE_MAX_WORKER_THREADS>0
001953    if( pIncr->bUseThread ){
001954      rc = vdbeSorterJoinThread(pIncr->pTask);
001955  
001956      if( rc==SQLITE_OK ){
001957        SorterFile f0 = pIncr->aFile[0];
001958        pIncr->aFile[0] = pIncr->aFile[1];
001959        pIncr->aFile[1] = f0;
001960      }
001961  
001962      if( rc==SQLITE_OK ){
001963        if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
001964          pIncr->bEof = 1;
001965        }else{
001966          rc = vdbeIncrBgPopulate(pIncr);
001967        }
001968      }
001969    }else
001970  #endif
001971    {
001972      rc = vdbeIncrPopulate(pIncr);
001973      pIncr->aFile[0] = pIncr->aFile[1];
001974      if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
001975        pIncr->bEof = 1;
001976      }
001977    }
001978  
001979    return rc;
001980  }
001981  
001982  /*
001983  ** Allocate and return a new IncrMerger object to read data from pMerger.
001984  **
001985  ** If an OOM condition is encountered, return NULL. In this case free the
001986  ** pMerger argument before returning.
001987  */
001988  static int vdbeIncrMergerNew(
001989    SortSubtask *pTask,     /* The thread that will be using the new IncrMerger */
001990    MergeEngine *pMerger,   /* The MergeEngine that the IncrMerger will control */
001991    IncrMerger **ppOut      /* Write the new IncrMerger here */
001992  ){
001993    int rc = SQLITE_OK;
001994    IncrMerger *pIncr = *ppOut = (IncrMerger*)
001995         (sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr)));
001996    if( pIncr ){
001997      pIncr->pMerger = pMerger;
001998      pIncr->pTask = pTask;
001999      pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
002000      pTask->file2.iEof += pIncr->mxSz;
002001    }else{
002002      vdbeMergeEngineFree(pMerger);
002003      rc = SQLITE_NOMEM_BKPT;
002004    }
002005    return rc;
002006  }
002007  
002008  #if SQLITE_MAX_WORKER_THREADS>0
002009  /*
002010  ** Set the "use-threads" flag on object pIncr.
002011  */
002012  static void vdbeIncrMergerSetThreads(IncrMerger *pIncr){
002013    pIncr->bUseThread = 1;
002014    pIncr->pTask->file2.iEof -= pIncr->mxSz;
002015  }
002016  #endif /* SQLITE_MAX_WORKER_THREADS>0 */
002017  
002018  
002019  
002020  /*
002021  ** Recompute pMerger->aTree[iOut] by comparing the next keys on the
002022  ** two PmaReaders that feed that entry.  Neither of the PmaReaders
002023  ** are advanced.  This routine merely does the comparison.
002024  */
002025  static void vdbeMergeEngineCompare(
002026    MergeEngine *pMerger,  /* Merge engine containing PmaReaders to compare */
002027    int iOut               /* Store the result in pMerger->aTree[iOut] */
002028  ){
002029    int i1;
002030    int i2;
002031    int iRes;
002032    PmaReader *p1;
002033    PmaReader *p2;
002034  
002035    assert( iOut<pMerger->nTree && iOut>0 );
002036  
002037    if( iOut>=(pMerger->nTree/2) ){
002038      i1 = (iOut - pMerger->nTree/2) * 2;
002039      i2 = i1 + 1;
002040    }else{
002041      i1 = pMerger->aTree[iOut*2];
002042      i2 = pMerger->aTree[iOut*2+1];
002043    }
002044  
002045    p1 = &pMerger->aReadr[i1];
002046    p2 = &pMerger->aReadr[i2];
002047  
002048    if( p1->pFd==0 ){
002049      iRes = i2;
002050    }else if( p2->pFd==0 ){
002051      iRes = i1;
002052    }else{
002053      SortSubtask *pTask = pMerger->pTask;
002054      int bCached = 0;
002055      int res;
002056      assert( pTask->pUnpacked!=0 );  /* from vdbeSortSubtaskMain() */
002057      res = pTask->xCompare(
002058          pTask, &bCached, p1->aKey, p1->nKey, p2->aKey, p2->nKey
002059      );
002060      if( res<=0 ){
002061        iRes = i1;
002062      }else{
002063        iRes = i2;
002064      }
002065    }
002066  
002067    pMerger->aTree[iOut] = iRes;
002068  }
002069  
002070  /*
002071  ** Allowed values for the eMode parameter to vdbeMergeEngineInit()
002072  ** and vdbePmaReaderIncrMergeInit().
002073  **
002074  ** Only INCRINIT_NORMAL is valid in single-threaded builds (when
002075  ** SQLITE_MAX_WORKER_THREADS==0).  The other values are only used
002076  ** when there exists one or more separate worker threads.
002077  */
002078  #define INCRINIT_NORMAL 0
002079  #define INCRINIT_TASK   1
002080  #define INCRINIT_ROOT   2
002081  
002082  /* 
002083  ** Forward reference required as the vdbeIncrMergeInit() and
002084  ** vdbePmaReaderIncrInit() routines are called mutually recursively when
002085  ** building a merge tree.
002086  */
002087  static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode);
002088  
002089  /*
002090  ** Initialize the MergeEngine object passed as the second argument. Once this
002091  ** function returns, the first key of merged data may be read from the 
002092  ** MergeEngine object in the usual fashion.
002093  **
002094  ** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge
002095  ** objects attached to the PmaReader objects that the merger reads from have
002096  ** already been populated, but that they have not yet populated aFile[0] and
002097  ** set the PmaReader objects up to read from it. In this case all that is
002098  ** required is to call vdbePmaReaderNext() on each PmaReader to point it at
002099  ** its first key.
002100  **
002101  ** Otherwise, if eMode is any value other than INCRINIT_ROOT, then use 
002102  ** vdbePmaReaderIncrMergeInit() to initialize each PmaReader that feeds data 
002103  ** to pMerger.
002104  **
002105  ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
002106  */
002107  static int vdbeMergeEngineInit(
002108    SortSubtask *pTask,             /* Thread that will run pMerger */
002109    MergeEngine *pMerger,           /* MergeEngine to initialize */
002110    int eMode                       /* One of the INCRINIT_XXX constants */
002111  ){
002112    int rc = SQLITE_OK;             /* Return code */
002113    int i;                          /* For looping over PmaReader objects */
002114    int nTree = pMerger->nTree;
002115  
002116    /* eMode is always INCRINIT_NORMAL in single-threaded mode */
002117    assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
002118  
002119    /* Verify that the MergeEngine is assigned to a single thread */
002120    assert( pMerger->pTask==0 );
002121    pMerger->pTask = pTask;
002122  
002123    for(i=0; i<nTree; i++){
002124      if( SQLITE_MAX_WORKER_THREADS>0 && eMode==INCRINIT_ROOT ){
002125        /* PmaReaders should be normally initialized in order, as if they are
002126        ** reading from the same temp file this makes for more linear file IO.
002127        ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is
002128        ** in use it will block the vdbePmaReaderNext() call while it uses
002129        ** the main thread to fill its buffer. So calling PmaReaderNext()
002130        ** on this PmaReader before any of the multi-threaded PmaReaders takes
002131        ** better advantage of multi-processor hardware. */
002132        rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]);
002133      }else{
002134        rc = vdbePmaReaderIncrInit(&pMerger->aReadr[i], INCRINIT_NORMAL);
002135      }
002136      if( rc!=SQLITE_OK ) return rc;
002137    }
002138  
002139    for(i=pMerger->nTree-1; i>0; i--){
002140      vdbeMergeEngineCompare(pMerger, i);
002141    }
002142    return pTask->pUnpacked->errCode;
002143  }
002144  
002145  /*
002146  ** The PmaReader passed as the first argument is guaranteed to be an
002147  ** incremental-reader (pReadr->pIncr!=0). This function serves to open
002148  ** and/or initialize the temp file related fields of the IncrMerge
002149  ** object at (pReadr->pIncr).
002150  **
002151  ** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders
002152  ** in the sub-tree headed by pReadr are also initialized. Data is then 
002153  ** loaded into the buffers belonging to pReadr and it is set to point to 
002154  ** the first key in its range.
002155  **
002156  ** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed
002157  ** to be a multi-threaded PmaReader and this function is being called in a
002158  ** background thread. In this case all PmaReaders in the sub-tree are 
002159  ** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to
002160  ** pReadr is populated. However, pReadr itself is not set up to point
002161  ** to its first key. A call to vdbePmaReaderNext() is still required to do
002162  ** that. 
002163  **
002164  ** The reason this function does not call vdbePmaReaderNext() immediately 
002165  ** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that it has
002166  ** to block on thread (pTask->thread) before accessing aFile[1]. But, since
002167  ** this entire function is being run by thread (pTask->thread), that will
002168  ** lead to the current background thread attempting to join itself.
002169  **
002170  ** Finally, if argument eMode is set to INCRINIT_ROOT, it may be assumed
002171  ** that pReadr->pIncr is a multi-threaded IncrMerge objects, and that all
002172  ** child-trees have already been initialized using IncrInit(INCRINIT_TASK).
002173  ** In this case vdbePmaReaderNext() is called on all child PmaReaders and
002174  ** the current PmaReader set to point to the first key in its range.
002175  **
002176  ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
002177  */
002178  static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){
002179    int rc = SQLITE_OK;
002180    IncrMerger *pIncr = pReadr->pIncr;
002181    SortSubtask *pTask = pIncr->pTask;
002182    sqlite3 *db = pTask->pSorter->db;
002183  
002184    /* eMode is always INCRINIT_NORMAL in single-threaded mode */
002185    assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
002186  
002187    rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode);
002188  
002189    /* Set up the required files for pIncr. A multi-theaded IncrMerge object
002190    ** requires two temp files to itself, whereas a single-threaded object
002191    ** only requires a region of pTask->file2. */
002192    if( rc==SQLITE_OK ){
002193      int mxSz = pIncr->mxSz;
002194  #if SQLITE_MAX_WORKER_THREADS>0
002195      if( pIncr->bUseThread ){
002196        rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd);
002197        if( rc==SQLITE_OK ){
002198          rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd);
002199        }
002200      }else
002201  #endif
002202      /*if( !pIncr->bUseThread )*/{
002203        if( pTask->file2.pFd==0 ){
002204          assert( pTask->file2.iEof>0 );
002205          rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd);
002206          pTask->file2.iEof = 0;
002207        }
002208        if( rc==SQLITE_OK ){
002209          pIncr->aFile[1].pFd = pTask->file2.pFd;
002210          pIncr->iStartOff = pTask->file2.iEof;
002211          pTask->file2.iEof += mxSz;
002212        }
002213      }
002214    }
002215  
002216  #if SQLITE_MAX_WORKER_THREADS>0
002217    if( rc==SQLITE_OK && pIncr->bUseThread ){
002218      /* Use the current thread to populate aFile[1], even though this
002219      ** PmaReader is multi-threaded. If this is an INCRINIT_TASK object,
002220      ** then this function is already running in background thread 
002221      ** pIncr->pTask->thread. 
002222      **
002223      ** If this is the INCRINIT_ROOT object, then it is running in the 
002224      ** main VDBE thread. But that is Ok, as that thread cannot return
002225      ** control to the VDBE or proceed with anything useful until the 
002226      ** first results are ready from this merger object anyway.
002227      */
002228      assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK );
002229      rc = vdbeIncrPopulate(pIncr);
002230    }
002231  #endif
002232  
002233    if( rc==SQLITE_OK && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK) ){
002234      rc = vdbePmaReaderNext(pReadr);
002235    }
002236  
002237    return rc;
002238  }
002239  
002240  #if SQLITE_MAX_WORKER_THREADS>0
002241  /*
002242  ** The main routine for vdbePmaReaderIncrMergeInit() operations run in 
002243  ** background threads.
002244  */
002245  static void *vdbePmaReaderBgIncrInit(void *pCtx){
002246    PmaReader *pReader = (PmaReader*)pCtx;
002247    void *pRet = SQLITE_INT_TO_PTR(
002248                    vdbePmaReaderIncrMergeInit(pReader,INCRINIT_TASK)
002249                 );
002250    pReader->pIncr->pTask->bDone = 1;
002251    return pRet;
002252  }
002253  #endif
002254  
002255  /*
002256  ** If the PmaReader passed as the first argument is not an incremental-reader
002257  ** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it invokes
002258  ** the vdbePmaReaderIncrMergeInit() function with the parameters passed to
002259  ** this routine to initialize the incremental merge.
002260  ** 
002261  ** If the IncrMerger object is multi-threaded (IncrMerger.bUseThread==1), 
002262  ** then a background thread is launched to call vdbePmaReaderIncrMergeInit().
002263  ** Or, if the IncrMerger is single threaded, the same function is called
002264  ** using the current thread.
002265  */
002266  static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode){
002267    IncrMerger *pIncr = pReadr->pIncr;   /* Incremental merger */
002268    int rc = SQLITE_OK;                  /* Return code */
002269    if( pIncr ){
002270  #if SQLITE_MAX_WORKER_THREADS>0
002271      assert( pIncr->bUseThread==0 || eMode==INCRINIT_TASK );
002272      if( pIncr->bUseThread ){
002273        void *pCtx = (void*)pReadr;
002274        rc = vdbeSorterCreateThread(pIncr->pTask, vdbePmaReaderBgIncrInit, pCtx);
002275      }else
002276  #endif
002277      {
002278        rc = vdbePmaReaderIncrMergeInit(pReadr, eMode);
002279      }
002280    }
002281    return rc;
002282  }
002283  
002284  /*
002285  ** Allocate a new MergeEngine object to merge the contents of nPMA level-0
002286  ** PMAs from pTask->file. If no error occurs, set *ppOut to point to
002287  ** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
002288  ** to NULL and return an SQLite error code.
002289  **
002290  ** When this function is called, *piOffset is set to the offset of the
002291  ** first PMA to read from pTask->file. Assuming no error occurs, it is 
002292  ** set to the offset immediately following the last byte of the last
002293  ** PMA before returning. If an error does occur, then the final value of
002294  ** *piOffset is undefined.
002295  */
002296  static int vdbeMergeEngineLevel0(
002297    SortSubtask *pTask,             /* Sorter task to read from */
002298    int nPMA,                       /* Number of PMAs to read */
002299    i64 *piOffset,                  /* IN/OUT: Readr offset in pTask->file */
002300    MergeEngine **ppOut             /* OUT: New merge-engine */
002301  ){
002302    MergeEngine *pNew;              /* Merge engine to return */
002303    i64 iOff = *piOffset;
002304    int i;
002305    int rc = SQLITE_OK;
002306  
002307    *ppOut = pNew = vdbeMergeEngineNew(nPMA);
002308    if( pNew==0 ) rc = SQLITE_NOMEM_BKPT;
002309  
002310    for(i=0; i<nPMA && rc==SQLITE_OK; i++){
002311      i64 nDummy = 0;
002312      PmaReader *pReadr = &pNew->aReadr[i];
002313      rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pReadr, &nDummy);
002314      iOff = pReadr->iEof;
002315    }
002316  
002317    if( rc!=SQLITE_OK ){
002318      vdbeMergeEngineFree(pNew);
002319      *ppOut = 0;
002320    }
002321    *piOffset = iOff;
002322    return rc;
002323  }
002324  
002325  /*
002326  ** Return the depth of a tree comprising nPMA PMAs, assuming a fanout of
002327  ** SORTER_MAX_MERGE_COUNT. The returned value does not include leaf nodes.
002328  **
002329  ** i.e.
002330  **
002331  **   nPMA<=16    -> TreeDepth() == 0
002332  **   nPMA<=256   -> TreeDepth() == 1
002333  **   nPMA<=65536 -> TreeDepth() == 2
002334  */
002335  static int vdbeSorterTreeDepth(int nPMA){
002336    int nDepth = 0;
002337    i64 nDiv = SORTER_MAX_MERGE_COUNT;
002338    while( nDiv < (i64)nPMA ){
002339      nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
002340      nDepth++;
002341    }
002342    return nDepth;
002343  }
002344  
002345  /*
002346  ** pRoot is the root of an incremental merge-tree with depth nDepth (according
002347  ** to vdbeSorterTreeDepth()). pLeaf is the iSeq'th leaf to be added to the
002348  ** tree, counting from zero. This function adds pLeaf to the tree.
002349  **
002350  ** If successful, SQLITE_OK is returned. If an error occurs, an SQLite error
002351  ** code is returned and pLeaf is freed.
002352  */
002353  static int vdbeSorterAddToTree(
002354    SortSubtask *pTask,             /* Task context */
002355    int nDepth,                     /* Depth of tree according to TreeDepth() */
002356    int iSeq,                       /* Sequence number of leaf within tree */
002357    MergeEngine *pRoot,             /* Root of tree */
002358    MergeEngine *pLeaf              /* Leaf to add to tree */
002359  ){
002360    int rc = SQLITE_OK;
002361    int nDiv = 1;
002362    int i;
002363    MergeEngine *p = pRoot;
002364    IncrMerger *pIncr;
002365  
002366    rc = vdbeIncrMergerNew(pTask, pLeaf, &pIncr);
002367  
002368    for(i=1; i<nDepth; i++){
002369      nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
002370    }
002371  
002372    for(i=1; i<nDepth && rc==SQLITE_OK; i++){
002373      int iIter = (iSeq / nDiv) % SORTER_MAX_MERGE_COUNT;
002374      PmaReader *pReadr = &p->aReadr[iIter];
002375  
002376      if( pReadr->pIncr==0 ){
002377        MergeEngine *pNew = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
002378        if( pNew==0 ){
002379          rc = SQLITE_NOMEM_BKPT;
002380        }else{
002381          rc = vdbeIncrMergerNew(pTask, pNew, &pReadr->pIncr);
002382        }
002383      }
002384      if( rc==SQLITE_OK ){
002385        p = pReadr->pIncr->pMerger;
002386        nDiv = nDiv / SORTER_MAX_MERGE_COUNT;
002387      }
002388    }
002389  
002390    if( rc==SQLITE_OK ){
002391      p->aReadr[iSeq % SORTER_MAX_MERGE_COUNT].pIncr = pIncr;
002392    }else{
002393      vdbeIncrFree(pIncr);
002394    }
002395    return rc;
002396  }
002397  
002398  /*
002399  ** This function is called as part of a SorterRewind() operation on a sorter
002400  ** that has already written two or more level-0 PMAs to one or more temp
002401  ** files. It builds a tree of MergeEngine/IncrMerger/PmaReader objects that 
002402  ** can be used to incrementally merge all PMAs on disk.
002403  **
002404  ** If successful, SQLITE_OK is returned and *ppOut set to point to the
002405  ** MergeEngine object at the root of the tree before returning. Or, if an
002406  ** error occurs, an SQLite error code is returned and the final value 
002407  ** of *ppOut is undefined.
002408  */
002409  static int vdbeSorterMergeTreeBuild(
002410    VdbeSorter *pSorter,       /* The VDBE cursor that implements the sort */
002411    MergeEngine **ppOut        /* Write the MergeEngine here */
002412  ){
002413    MergeEngine *pMain = 0;
002414    int rc = SQLITE_OK;
002415    int iTask;
002416  
002417  #if SQLITE_MAX_WORKER_THREADS>0
002418    /* If the sorter uses more than one task, then create the top-level 
002419    ** MergeEngine here. This MergeEngine will read data from exactly 
002420    ** one PmaReader per sub-task.  */
002421    assert( pSorter->bUseThreads || pSorter->nTask==1 );
002422    if( pSorter->nTask>1 ){
002423      pMain = vdbeMergeEngineNew(pSorter->nTask);
002424      if( pMain==0 ) rc = SQLITE_NOMEM_BKPT;
002425    }
002426  #endif
002427  
002428    for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
002429      SortSubtask *pTask = &pSorter->aTask[iTask];
002430      assert( pTask->nPMA>0 || SQLITE_MAX_WORKER_THREADS>0 );
002431      if( SQLITE_MAX_WORKER_THREADS==0 || pTask->nPMA ){
002432        MergeEngine *pRoot = 0;     /* Root node of tree for this task */
002433        int nDepth = vdbeSorterTreeDepth(pTask->nPMA);
002434        i64 iReadOff = 0;
002435  
002436        if( pTask->nPMA<=SORTER_MAX_MERGE_COUNT ){
002437          rc = vdbeMergeEngineLevel0(pTask, pTask->nPMA, &iReadOff, &pRoot);
002438        }else{
002439          int i;
002440          int iSeq = 0;
002441          pRoot = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
002442          if( pRoot==0 ) rc = SQLITE_NOMEM_BKPT;
002443          for(i=0; i<pTask->nPMA && rc==SQLITE_OK; i += SORTER_MAX_MERGE_COUNT){
002444            MergeEngine *pMerger = 0; /* New level-0 PMA merger */
002445            int nReader;              /* Number of level-0 PMAs to merge */
002446  
002447            nReader = MIN(pTask->nPMA - i, SORTER_MAX_MERGE_COUNT);
002448            rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger);
002449            if( rc==SQLITE_OK ){
002450              rc = vdbeSorterAddToTree(pTask, nDepth, iSeq++, pRoot, pMerger);
002451            }
002452          }
002453        }
002454  
002455        if( rc==SQLITE_OK ){
002456  #if SQLITE_MAX_WORKER_THREADS>0
002457          if( pMain!=0 ){
002458            rc = vdbeIncrMergerNew(pTask, pRoot, &pMain->aReadr[iTask].pIncr);
002459          }else
002460  #endif
002461          {
002462            assert( pMain==0 );
002463            pMain = pRoot;
002464          }
002465        }else{
002466          vdbeMergeEngineFree(pRoot);
002467        }
002468      }
002469    }
002470  
002471    if( rc!=SQLITE_OK ){
002472      vdbeMergeEngineFree(pMain);
002473      pMain = 0;
002474    }
002475    *ppOut = pMain;
002476    return rc;
002477  }
002478  
002479  /*
002480  ** This function is called as part of an sqlite3VdbeSorterRewind() operation
002481  ** on a sorter that has written two or more PMAs to temporary files. It sets
002482  ** up either VdbeSorter.pMerger (for single threaded sorters) or pReader
002483  ** (for multi-threaded sorters) so that it can be used to iterate through
002484  ** all records stored in the sorter.
002485  **
002486  ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
002487  */
002488  static int vdbeSorterSetupMerge(VdbeSorter *pSorter){
002489    int rc;                         /* Return code */
002490    SortSubtask *pTask0 = &pSorter->aTask[0];
002491    MergeEngine *pMain = 0;
002492  #if SQLITE_MAX_WORKER_THREADS
002493    sqlite3 *db = pTask0->pSorter->db;
002494    int i;
002495    SorterCompare xCompare = vdbeSorterGetCompare(pSorter);
002496    for(i=0; i<pSorter->nTask; i++){
002497      pSorter->aTask[i].xCompare = xCompare;
002498    }
002499  #endif
002500  
002501    rc = vdbeSorterMergeTreeBuild(pSorter, &pMain);
002502    if( rc==SQLITE_OK ){
002503  #if SQLITE_MAX_WORKER_THREADS
002504      assert( pSorter->bUseThreads==0 || pSorter->nTask>1 );
002505      if( pSorter->bUseThreads ){
002506        int iTask;
002507        PmaReader *pReadr = 0;
002508        SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1];
002509        rc = vdbeSortAllocUnpacked(pLast);
002510        if( rc==SQLITE_OK ){
002511          pReadr = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader));
002512          pSorter->pReader = pReadr;
002513          if( pReadr==0 ) rc = SQLITE_NOMEM_BKPT;
002514        }
002515        if( rc==SQLITE_OK ){
002516          rc = vdbeIncrMergerNew(pLast, pMain, &pReadr->pIncr);
002517          if( rc==SQLITE_OK ){
002518            vdbeIncrMergerSetThreads(pReadr->pIncr);
002519            for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
002520              IncrMerger *pIncr;
002521              if( (pIncr = pMain->aReadr[iTask].pIncr) ){
002522                vdbeIncrMergerSetThreads(pIncr);
002523                assert( pIncr->pTask!=pLast );
002524              }
002525            }
002526            for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
002527              /* Check that:
002528              **   
002529              **   a) The incremental merge object is configured to use the
002530              **      right task, and
002531              **   b) If it is using task (nTask-1), it is configured to run
002532              **      in single-threaded mode. This is important, as the
002533              **      root merge (INCRINIT_ROOT) will be using the same task
002534              **      object.
002535              */
002536              PmaReader *p = &pMain->aReadr[iTask];
002537              assert( p->pIncr==0 || (
002538                  (p->pIncr->pTask==&pSorter->aTask[iTask])             /* a */
002539               && (iTask!=pSorter->nTask-1 || p->pIncr->bUseThread==0)  /* b */
002540              ));
002541              rc = vdbePmaReaderIncrInit(p, INCRINIT_TASK);
002542            }
002543          }
002544          pMain = 0;
002545        }
002546        if( rc==SQLITE_OK ){
002547          rc = vdbePmaReaderIncrMergeInit(pReadr, INCRINIT_ROOT);
002548        }
002549      }else
002550  #endif
002551      {
002552        rc = vdbeMergeEngineInit(pTask0, pMain, INCRINIT_NORMAL);
002553        pSorter->pMerger = pMain;
002554        pMain = 0;
002555      }
002556    }
002557  
002558    if( rc!=SQLITE_OK ){
002559      vdbeMergeEngineFree(pMain);
002560    }
002561    return rc;
002562  }
002563  
002564  
002565  /*
002566  ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
002567  ** this function is called to prepare for iterating through the records
002568  ** in sorted order.
002569  */
002570  int sqlite3VdbeSorterRewind(const VdbeCursor *pCsr, int *pbEof){
002571    VdbeSorter *pSorter;
002572    int rc = SQLITE_OK;             /* Return code */
002573  
002574    assert( pCsr->eCurType==CURTYPE_SORTER );
002575    pSorter = pCsr->uc.pSorter;
002576    assert( pSorter );
002577  
002578    /* If no data has been written to disk, then do not do so now. Instead,
002579    ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
002580    ** from the in-memory list.  */
002581    if( pSorter->bUsePMA==0 ){
002582      if( pSorter->list.pList ){
002583        *pbEof = 0;
002584        rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list);
002585      }else{
002586        *pbEof = 1;
002587      }
002588      return rc;
002589    }
002590  
002591    /* Write the current in-memory list to a PMA. When the VdbeSorterWrite() 
002592    ** function flushes the contents of memory to disk, it immediately always
002593    ** creates a new list consisting of a single key immediately afterwards.
002594    ** So the list is never empty at this point.  */
002595    assert( pSorter->list.pList );
002596    rc = vdbeSorterFlushPMA(pSorter);
002597  
002598    /* Join all threads */
002599    rc = vdbeSorterJoinAll(pSorter, rc);
002600  
002601    vdbeSorterRewindDebug("rewind");
002602  
002603    /* Assuming no errors have occurred, set up a merger structure to 
002604    ** incrementally read and merge all remaining PMAs.  */
002605    assert( pSorter->pReader==0 );
002606    if( rc==SQLITE_OK ){
002607      rc = vdbeSorterSetupMerge(pSorter);
002608      *pbEof = 0;
002609    }
002610  
002611    vdbeSorterRewindDebug("rewinddone");
002612    return rc;
002613  }
002614  
002615  /*
002616  ** Advance to the next element in the sorter.
002617  */
002618  int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
002619    VdbeSorter *pSorter;
002620    int rc;                         /* Return code */
002621  
002622    assert( pCsr->eCurType==CURTYPE_SORTER );
002623    pSorter = pCsr->uc.pSorter;
002624    assert( pSorter->bUsePMA || (pSorter->pReader==0 && pSorter->pMerger==0) );
002625    if( pSorter->bUsePMA ){
002626      assert( pSorter->pReader==0 || pSorter->pMerger==0 );
002627      assert( pSorter->bUseThreads==0 || pSorter->pReader );
002628      assert( pSorter->bUseThreads==1 || pSorter->pMerger );
002629  #if SQLITE_MAX_WORKER_THREADS>0
002630      if( pSorter->bUseThreads ){
002631        rc = vdbePmaReaderNext(pSorter->pReader);
002632        *pbEof = (pSorter->pReader->pFd==0);
002633      }else
002634  #endif
002635      /*if( !pSorter->bUseThreads )*/ {
002636        assert( pSorter->pMerger!=0 );
002637        assert( pSorter->pMerger->pTask==(&pSorter->aTask[0]) );
002638        rc = vdbeMergeEngineStep(pSorter->pMerger, pbEof);
002639      }
002640    }else{
002641      SorterRecord *pFree = pSorter->list.pList;
002642      pSorter->list.pList = pFree->u.pNext;
002643      pFree->u.pNext = 0;
002644      if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree);
002645      *pbEof = !pSorter->list.pList;
002646      rc = SQLITE_OK;
002647    }
002648    return rc;
002649  }
002650  
002651  /*
002652  ** Return a pointer to a buffer owned by the sorter that contains the 
002653  ** current key.
002654  */
002655  static void *vdbeSorterRowkey(
002656    const VdbeSorter *pSorter,      /* Sorter object */
002657    int *pnKey                      /* OUT: Size of current key in bytes */
002658  ){
002659    void *pKey;
002660    if( pSorter->bUsePMA ){
002661      PmaReader *pReader;
002662  #if SQLITE_MAX_WORKER_THREADS>0
002663      if( pSorter->bUseThreads ){
002664        pReader = pSorter->pReader;
002665      }else
002666  #endif
002667      /*if( !pSorter->bUseThreads )*/{
002668        pReader = &pSorter->pMerger->aReadr[pSorter->pMerger->aTree[1]];
002669      }
002670      *pnKey = pReader->nKey;
002671      pKey = pReader->aKey;
002672    }else{
002673      *pnKey = pSorter->list.pList->nVal;
002674      pKey = SRVAL(pSorter->list.pList);
002675    }
002676    return pKey;
002677  }
002678  
002679  /*
002680  ** Copy the current sorter key into the memory cell pOut.
002681  */
002682  int sqlite3VdbeSorterRowkey(const VdbeCursor *pCsr, Mem *pOut){
002683    VdbeSorter *pSorter;
002684    void *pKey; int nKey;           /* Sorter key to copy into pOut */
002685  
002686    assert( pCsr->eCurType==CURTYPE_SORTER );
002687    pSorter = pCsr->uc.pSorter;
002688    pKey = vdbeSorterRowkey(pSorter, &nKey);
002689    if( sqlite3VdbeMemClearAndResize(pOut, nKey) ){
002690      return SQLITE_NOMEM_BKPT;
002691    }
002692    pOut->n = nKey;
002693    MemSetTypeFlag(pOut, MEM_Blob);
002694    memcpy(pOut->z, pKey, nKey);
002695  
002696    return SQLITE_OK;
002697  }
002698  
002699  /*
002700  ** Compare the key in memory cell pVal with the key that the sorter cursor
002701  ** passed as the first argument currently points to. For the purposes of
002702  ** the comparison, ignore the rowid field at the end of each record.
002703  **
002704  ** If the sorter cursor key contains any NULL values, consider it to be
002705  ** less than pVal. Even if pVal also contains NULL values.
002706  **
002707  ** If an error occurs, return an SQLite error code (i.e. SQLITE_NOMEM).
002708  ** Otherwise, set *pRes to a negative, zero or positive value if the
002709  ** key in pVal is smaller than, equal to or larger than the current sorter
002710  ** key.
002711  **
002712  ** This routine forms the core of the OP_SorterCompare opcode, which in
002713  ** turn is used to verify uniqueness when constructing a UNIQUE INDEX.
002714  */
002715  int sqlite3VdbeSorterCompare(
002716    const VdbeCursor *pCsr,         /* Sorter cursor */
002717    Mem *pVal,                      /* Value to compare to current sorter key */
002718    int nKeyCol,                    /* Compare this many columns */
002719    int *pRes                       /* OUT: Result of comparison */
002720  ){
002721    VdbeSorter *pSorter;
002722    UnpackedRecord *r2;
002723    KeyInfo *pKeyInfo;
002724    int i;
002725    void *pKey; int nKey;           /* Sorter key to compare pVal with */
002726  
002727    assert( pCsr->eCurType==CURTYPE_SORTER );
002728    pSorter = pCsr->uc.pSorter;
002729    r2 = pSorter->pUnpacked;
002730    pKeyInfo = pCsr->pKeyInfo;
002731    if( r2==0 ){
002732      r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo);
002733      if( r2==0 ) return SQLITE_NOMEM_BKPT;
002734      r2->nField = nKeyCol;
002735    }
002736    assert( r2->nField==nKeyCol );
002737  
002738    pKey = vdbeSorterRowkey(pSorter, &nKey);
002739    sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2);
002740    for(i=0; i<nKeyCol; i++){
002741      if( r2->aMem[i].flags & MEM_Null ){
002742        *pRes = -1;
002743        return SQLITE_OK;
002744      }
002745    }
002746  
002747    *pRes = sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2);
002748    return SQLITE_OK;
002749  }