Tuesday 1 September 2015




Sorting Huge files in Split Seconds


During my project, there was a requirement to sort a file having 1 lakh records of fixed length data. I just wanted to share a solution I adopted to sort this fast. The implementation was done using Spring batch and Java NIO.
k-way merge sort algorithm was used to sort the file. K-way merge sort follows the following steps:

  •   Splits the input file into ‘k’ chunks and write them into ‘k’ files in sorted order. 
  •   Open the ‘k’ files and read the first lines of each file and compare them. The least line is written to the final file. Also the file pointer for the file in which the least line is taken is incremented to read the next line.
  •    Step 2 is repeated until all the records in all the ‘k’ files are read through.


As mentioned earlier, Spring batch is used for the implementation. Spring batch provides a concept of chunking. A chunk of data is read from the input file, processed and written to an output resource. Given a large input file and that we need to >> split the file  into ‘k’ chunks  >> sort the individual records in the chunk  >> write them to a chunk file named ‘chunk_k’, we use spring batch  to process chunks within  the file.
To process fast, we would need individual threads to cater to the processing of each chunk of data.
But this would mean that we would have the same input file being read by multiple threads. In this scenario the number of threads can be considered equal to the number of chunks the file would be have to be split into. In that sense, ‘k’ threads will be accessing ‘k’chunks of data from the input file as shown in the below diagram :



The threads are configured as below:
<beans:beanid="taskExecutor"
                                   class="org.springframework.core.task.SimpleAsyncTaskExecutor">
           <beans:propertyname="concurrencyLimit"value="10"/>

The number of chunks for the input file is calculated by dividing the size of the file(in bytes) by the chunk size (in bytes).
Chunk size is set to: 6760 bytes.
Now each thread is aware that they need to read “Chunk_size” bytes at max from the input file.
But how do the threads know at what position they need to read from within the file. Or in other words, how will the threads coordinate among them so that each thread in ensured read from a unique position within the file, which no other thread is guaranteed to read and conflict with. Clearly synchronizing on the file read operation on the input file would hurt our scalability.
To solve this issue, we have used non blocking ‘Compare and Swap’ concurrent data structure. This ensures an optimistic locking performance. Each thread would CPU spin till it wins with a file position which no other threads have managed to win. For this we have AtomicReference from java concurrent data structures. The following snippet depicts this:

privatefinalAtomicReference<Byte_Chunk_Pair>Byte_Chunk_Atomic_Ref =
newAtomicReference<Byte_Chunk_Pair>(newByte_Chunk_Pair(0, 0));
           publicByte_Chunk_PairincrementByte_ChunkCounterAtomically() {
                       Byte_Chunk_Pairprev, newValue;
                       do {
                       prev = Byte_Chunk_Atomic_Ref.get();
                       longnoOfBytes = prev.getByteTracker() + READ_SIZE;
                       longnoOfChunks = prev.getChunkTracker() + 1;
                      
                       newValue = newByte_Chunk_Pair(noOfBytes, noOfChunks);
                         } while (!Byte_Chunk_Atomic_Ref.compareAndSet(prev, newValue));
                       returnprev;
                       }
Now the threads would individually read their individual chunk from the input file using AsynchronousFileChannel from java no package. This is depicted below:

public SortDTOreadChunk(){
                       ByteBuffer buffer = ByteBuffer.allocate(AsynchIOChannelHolder.READ_SIZE);
                       // increment chunk count and byte count atomically
                       Byte_Chunk_PairpreviousByte_Chunk_Pair=  incrementByte_ChunkCounterAtomically();
                       if(previousByte_Chunk_Pair.getByteTracker()>getFileSizeinBytes()){
                                   returnnull;
                       }
                       Future<Integer>futre = getReadChannel().read(buffer, previousByte_Chunk_Pair.getByteTracker());          
                       SortDTO sortDTO=  new SortDTO();
                       sortDTO.setFileResult(futre);
           sortDTO.setBuffer(buffer);
           sortDTO.setChunkTracker(previousByte_Chunk_Pair.getChunkTracker());
           sortDTO.setByteTracker(previousByte_Chunk_Pair.getByteTracker());
           return sortDTO;
           }
AsynchronousFileChannel helps us read from a particular position within the file. More importantly this is done asynchronously. As seen in the above snippet, the read operation returns a “Future”object. Any thread at any point during its execution can just call the “get” method on this future object to fetch the status of data read from the file as shown in the below snippet.

//Wait for all the writes to complete
                       SortIOChannelHolderchannelHolder=  SortIOChannelHolder.getInstance();
                       List<Future<Integer>>writerWorkCompletionHandles = channelHolder.getWriterWorkCompletionHandles();
                       for(Future<Integer>writeTask: writerWorkCompletionHandles){
                                   try {
                                               writeTask.get();
                                   } catch (InterruptedException | ExecutionException e) {
                                               // TODO Auto-generated catch block
                                               e.printStackTrace();
                                   }
                       }


Now the read,process,write operation from each thread runs in parallel as shown below:


We do not need to write the whole line contents into the temporary chunk_1 to chunk_k files.Instead the following information is written for each line visited from the input file.(considering there are n values to compare in a line)
<Starting Bytenumber For this line in the InputFile>,<Byte Length For this line in the Input File>,<Field1 value for compare>,<Field2 value to compare>….<Field n value to compare>
For writing we again use the asynchronous channel as below:

SortIOChannelHolder.getInstance().getWriterWorkCompletionHandles().add(channelToWrite.write(ByteBuffer.wrap(sb.toString().getBytes()), pos));


As depicted above the write also takes the starting byte number .It returns a future object. All such future objects from different threads are taken into a list which can be later checked for completion. Since this write happens asynchronously, the thread for each chunk saves the waiting time incurred on writing. For a thread pool with lower concurrent limit, this means that the thread can be allocated the work of the next chunk to process (if any) faster.
But there was still an issue. The read was done in terms of bytes. Given that the file contained lines and that an entity that we needed to sort is a line, how will the thread safely read complete lines.What if one thread reads part of a line and other thread reads the remaining part of the same line ?
To solve this, after reading a chunk in each thread, the thread coverts the byte buffer into a string and splits the string by newline character. The chance of incomplete lines comes only at the first line and the last lines in a chunk. Except for the first chunk and the last chunk , the first line and the last lines within the chunk are saved into a ‘Fragment’ object. In addition to the first line and the last line, the chunk number is also fed into this ‘Fragment’ object. In other words, the fragment object contains first line,last line and the chunk number. So if there are k chunks, there would be k fragments. The fragment for the first chunk has an empty first line and the fragment for the last chunk (kth chunk) would have an empty last line.(this is because the first line of the first chunk and the last line of the last chunk are already complete lines and need not be captured as fragments)
After creating the fragment, each thread would add this fragment object into a thread safe priority ‘fragments queue’. This is an instance of PriorityBlockingQueue.The thread processing the first chunk has the additional responsibility of dequeing fragment objects fed by the other chunk threads in the order of the chunk numbers within the fragment object and constructing complete lines. A complete line would be the last line in nth line concatenated with the starting line of n+1 th line. Finally this thread would write these data also in chunk_1 file as depicted below.



As mentioned earlier, the write instructions to the temporary files are asynchronous which just returns the Future objects, which we were storing into a list object.After all the chunks are done from the input file, the waiting is done each of these chunks.This done by registering a job listener(spring batch API) and implementing the ‘after job’ method to wait on the future objects.This ensures that all the write operations are done before the job completes (and JVM exits).
//Wait for all the writes to complete
                       SortIOChannelHolderchannelHolder=  SortIOChannelHolder.getInstance();
                       List<Future<Integer>>writerWorkCompletionHandles = channelHolder.getWriterWorkCompletionHandles();
                       for(Future<Integer>writeTask: writerWorkCompletionHandles){
                                   try {
                                               writeTask.get();
                                   } catch (InterruptedException | ExecutionException e) {
                                               // TODO Auto-generated catch block
                                               e.printStackTrace();
                                   }
                       }
Now that we have finished with chunk file writing, we need to merge them into the final output file.This is accomplished by a single thread (the merger thread).
The files chunk_1 to chunk_k are read using the asynch file channel and put into a thread safe queue. The records among these lines from the different chunk files are compared and the winning record is chosen. The winning record is parsed to fetch the starting byte number and byte length. Using this information a different signature of the read methods in the asynchronous file channel is invoked as shown in the below snippet:

private long transferDataToOutPutFile(long posInWriterFile,
                                   SortableLineDTO sortableDTO) {
                       ByteBuffer buffer = null;
                       try{
                                   buffer = ByteBuffer.allocate((int)sortableDTO.getBytesLengthForThisLine());
                       }
                       catch(NullPointerException npe){
                                   System.out.println("sortableDTO is "+ sortableDTO);
                                   npe.printStackTrace();
                       }
                       BufferAndPosHolderForWritting dataHolder = new BufferAndPosHolderForWritting();
                       dataHolder.setBufferToWrite(buffer);
                       dataHolder.setPosToWrite(posInWriterFile);
                       posInWriterFile=posInWriterFile+sortableDTO.getBytesLengthForThisLine();
                       getReadChannel().read(buffer, sortableDTO.getStartingByteNumberForThisLine(), dataHolder, newCompletionHandler<Integer, BufferAndPosHolderForWritting>() {
                       @Override
                       publicvoid completed(Integer result, BufferAndPosHolderForWritting attachment) {
                                   attachment.getBufferToWrite().flip();
                                   getListOfWrittingFutures().add(getOutFileWriteChannel().write(
attachment.getBufferToWrite(), attachment.getPosToWrite()));                                                   
                      
                           }

                       @Override
                       public void failed(Throwable exc, BufferAndPosHolderForWritting  attachment) {
                                   System.out.println("Some error occurred while writting ");
                           }
                       });
                       return posInWriterFile;
           }

This signature does not return any future as such. Instead as soon as the thread that completes the read operation completes asynchronously, the same thread invokes the operation implemented in the completion handler. As can be seen above, the completion handler retrieves the bytes read from the input file and invokes a write of the very same bytes into the final output file. However this write operation returns back a Future object, which we would wait for completion after all the write task submission are done alike for all lines. Since multiple reader threads would have initiated the write operation, the position to write is tallied using an AtomicLong object. This whole process is invoked is depicted below:
Finally as shown in the above diagram, the wait is done on the Future object for all the line write instructions so that the JVM exits only after all the writes are completely done.
Using this strategy with a concurrency limit of the thread pool as 10 and chunk size as 6760 bytes, sorting of 1 lakh records completed in 10 seconds.