The MASM Forum

Projects => Rarely Used Projects => RosAsm => Topic started by: guga on July 26, 2020, 01:18:18 PM

Title: Bitonic Sort
Post by: guga on July 26, 2020, 01:18:18 PM
Bitonic Sort algorithm for Double. It works sorting 16 doubles at once. This is, at least, 5 times faster then quicksort algo under the same conditions. It can sort a array or chunk of 16 Real8 values in less then 100 clocks (measured on my AMD Rayzen) compared to 500 clocks on  quicksort.

BitonicSortD function



;;
        BitonicSortD
       
            Sorting doubles.
           
            Old-school non-graphic programming of GPU's dusted off a bunch of classic algorithms that did little or no branching, and no data sharing between processors, but allowed massive parallelism.
            One of those algorithms is bitonic sort, aka network sort, whose only operator is one that compares two values and swaps them if the first is greater than the second.
            Bitonic sort does no conditional branching at all - giving it some advantage where instructions are pipelined. In the abstract, it is an O(n log n log n) algorithm, so, for non-parallel sorts,
            it was left in the dust behind O(n log n) algorithms like quicksort.

            The current function works for sorting Doubles (Real8) and it is at least 5 times faster then quicksort under the same circunstances. The function works
            The first half of the algorithm sorts the two halfs of all 16 Doubles [0,2,4,6] and [1,3,5,7] independently. The second half does a linear merge of the two sorted sequences.
           
            The current algorithm was biased on the excellent work of mischasan and the article from Timothy Furtak in: "Using SIMD registers and Instructions to Enable Instruction-Level Parallelism
            in Sorting Algorithms" written in 09/Jun/2007.


        Arguments:

            pArray (In/Out):  Pointer to a Array of 16 Double (Real8) values to be sorted. The sorted vales will be stored in the input.

    Example of usage:

        [TestSort: R$  136, 151, 216, 40, 13, 215, 24, 104,
                        230, 154, 227, 219, 52, 217, 38, 131]

        call BitonicSortD TestSort

    Macros used:

        ; SSE set of comparitions. Similar to If/Else_If/End_If etc, but used toCompare Doubles

        [SSE_D_If    |#=3| comisd #1 #3 | jn#2 G0> ]
        [SSE_D_Else  | jmp G5> | G0: ]
        [SSE_D_End   | G0: | G5:]
        [SSE_D_Else_If  | SSE_D_Else    | SSE_D_If    #1 #2 #3]
        [SSE_D_If_And   | SSE_D_If #1 #2 #3    | #+3]
        [SSE_D_Else_If_And    | SSE_D_Else    | SSE_D_If_And    #F>L]
        [SSE_D_If_Or  | comisd #1 #3 | j#2 H0>  | #+3 | jmp G0>  | H0:]

        ; Get the min max values of a xmm register and put them on the output
        [SSE_MIN_MAX_D | movups XMM0 X$#1+(16*#3) | movups xmm2 xmm0 | movups XMM1 X$#1+(16*#4) | minpd XMM0 xmm1 | movups X$#2+(16*#3) XMM0 | maxpd XMM2 xmm1 | movups X$#2+(16*#4) XMM2]

            ; Parameter #1 = Input register or memory buffer to load the data to xmm register. The input can be a register or a memory location
            ; Parameter #2 = The output register or Buffer where min max will be written onto. The output can be a register or a memory location
            ; Parameter #3 = The starting position to read/write the data from. It is a integer value from 0 to 7. So, it´s a maximum fo half the size of the Input of 16 Doubles
            ; Parameter #4 = The ending position to read/write the data from. It is a integer value from 0 to 7. So, it´s a maximum fo half the size of the Input of 16 Doubles
            ; It works like this: Ex: SSE_MIN_MAX_D edi, ecx, 0, 1
            ;   Get the data (2 doubles) at Starting Pos * 2 == 0*2 (On this example), and compare them to ending pos*1 (1*2). The comparition is done from the min/max values
            ;   from the the 2 doubles loaded from Pos1 and 2 doubles from Pos2.
            ;   In bytes, since we are dealing with Doubles, the math always multiply the position of those values by 16 (8*2 = 8 from te size of the double and 2 for the interleaving)
            ;   So, the data to be analysed is, in fact 8 bytes * 2 * Wanted Position away from the input. On this example, if we feed 1, we are getting the 1st 2 double values
            ;   from 32 bytes (2*8*1) ahead to the start of the input

    References:

    https://mischasan.wordpress.com/2011/09/02/update-on-bitonic-sse2-sort-of-16-doubles/
    https://github.com/mischasan/sse2

;;


[Size_Of_SortD_Table 128] ; 16(elements)*8(butes. Size of a Double)

; actually it is min max
[SSE_MIN_MAX_D | movups XMM0 X$#1+(16*#3) | movups xmm2 xmm0 | movups XMM1 X$#1+(16*#4) | minpd XMM0 xmm1 | movups X$#2+(16*#3) XMM0 | maxpd XMM2 xmm1 | movups X$#2+(16*#4) XMM2]

Proc BitonicSortD::
    Arguments @pArray
    Structure @pCurTable 128, @Tmp.Data1Dis 0
    Uses esi, edi, edx, ecx

    ; Initially load from Input

    ; Pointer to the Start of the Array of Doubles
    mov edi D@pArray

    ; Ecx points to a allocated local memory of 128 Bytes (In the same format as a local structure) to
    ; temporarily save the 2 halfs of the whole data to be sorted.
    ; The size of 128 is given by: 16 doubles * 8 bytes (size of a double)
    ; And then advances 8 bytes (The size of the next double) from the start to the next Double value.
    ; To make easier to understand, you may think on esi as a variable called "NextTblPos" next Table Position
    mov ecx D@pCurTable | lea esi D$ecx+8

    ; Save at edx the end of the Temporary Table to be compared at the end of the function.
    ;  you may think on edx as a variable called "TableEnd" (End of the Table)
    mov edx ecx | add edx Size_Of_SortD_Table

    ; This is where we are getting the min and max values from the input and saving them on the output variable or buffer we want.
    ; In our case, we are getting the 1st 4 Doubles from edi and copying them onto ecx (our temporary buffer)

    ; The SSE_MIN_MAX_D macro unrolled loads the data from edi from 4 to 4 chunks interleaved by N bytes (all multiple of 16) and store the
    ; result on the output (in ecx, on this case). It is unrolled like this:
    ; movups XMM0 X$#1+(16*#3) | movups xmm2 xmm0 | movups XMM1 X$#1+(16*#4) | minpd XMM0 xmm1 | movups X$#2+(16*#3) XMM0 | maxpd XMM2 xmm1 | movups X$#2+(16*#4) XMM2

    ; Parameter #1 = Input register or memory buffer to load the data
    ; Parameter #2 = The output register or Buffer where min max will be written onto
    ; Parameter #3 = The starting position to read/write the data from. It is a integer value from 0 to 7. So, it´s a maximum fo half the size of the Input of 16 Doubles
    ; Parameter #4 = The ending position to read/write the data from. It is a integer value from 0 to 7. So, it´s a maximum fo half the size of the Input of 16 Doubles
    ; It works like this: Ex: SSE_MIN_MAX_D edi, ecx, 0, 1
    ;   Get the data (2 doubles) at Starting Pos * 2 == 0*2 (On this example), and compare them to ending pos*1 (1*2). The comparition is done from the min/max values
    ;   from the the 2 doubles loaded from Pos1 and 2 doubles from Pos2.
    ;   In bytes, since we are dealing with Doubles, the math always multiply the position of those values by 16 (8*2 = 8 from te size of the double and 2 for the interleaving)
    ;   So, the data to be analysed is, in fact 8 bytes * 2 * Wanted Position away from the input. On this example, if we feed 1, we are getting the 1st 2 double values
    ;   from 32 bytes (2*8*1) ahead to the start of the input

    ; The bitonic technique is, therefore, simply swaping the data in between 2 columns with 8 pairs of Min/Max each (8*2 bytes), and feeding those data with the resultant Min and maximum values found among them.
    ; It does it in 3 steps


    ;  Bitonic step #1 - Use the inputed data in edi to swap the min max positions and store them on output at ecx (Our temporary buffer)

    SSE_MIN_MAX_D edi, ecx, 0, 1
    SSE_MIN_MAX_D edi, ecx, 2, 3
    SSE_MIN_MAX_D edi, ecx, 4, 5
    SSE_MIN_MAX_D edi, ecx, 6, 7


    ; Bitonic step #2 - Now that we already reordered the Inputed pairs of Min/Max onto the Temporary Buffer, is time to we swap those values from the buffer and so the same thing, all over the 2 pairs of columns.
    ; So, at input and output we will always use our temporary buffer in ecx, because we are now swapping  the temporary buffer.
    ; You can see easier from the macro that we are swapping them on a kind of `square` of 2*2 Pos long ordered on a specific way

    ; "Square 1" - From top to bottom (0 to 1) and then bottom to top (3 to 4)
    SSE_MIN_MAX_D ecx, ecx, 0, 3
    SSE_MIN_MAX_D ecx, ecx, 1, 2
    ; "Square 2" - From top to bottom (4 to 5) and then bottom to top (6 to 7)
    SSE_MIN_MAX_D ecx, ecx, 4, 7
    SSE_MIN_MAX_D ecx, ecx, 5, 6
    ; "Square 3" - From top right to left (0 to 1) and bottom right to left (2 to 3)
    SSE_MIN_MAX_D ecx, ecx, 0, 1
    SSE_MIN_MAX_D ecx, ecx, 2, 3
    ; "Square 4" - From top right to left (4 to 5) and bottom right to left (6 to 7)
    SSE_MIN_MAX_D ecx, ecx, 4, 5
    SSE_MIN_MAX_D ecx, ecx, 6, 7

    ;  Bitonic step #3 - We do the same swapping as in the previous step, but using the same kind of square of 4*2 Pos long ordered on the same way as previously

    ; "Square 1" - From top to bottom (0 to 3) and then bottom to top (4 to 7)
    SSE_MIN_MAX_D ecx, ecx, 0, 7
    SSE_MIN_MAX_D ecx, ecx, 1, 6
    SSE_MIN_MAX_D ecx, ecx, 2, 5
    SSE_MIN_MAX_D ecx, ecx, 3, 4

    ; "Square 2" - From top right to left interleaved by 2 pos (0 to 3), (1 to 3), (4 to 6) and then (5 to 7)
    SSE_MIN_MAX_D ecx, ecx, 0, 2
    SSE_MIN_MAX_D ecx, ecx, 1, 3
    SSE_MIN_MAX_D ecx, ecx, 4, 6
    SSE_MIN_MAX_D ecx, ecx, 5, 7

    ; "Square 3" - From top right to left (ordered) by 1 pos (0 to 1), (2 to 3), (4 to 5) and then (6 to 7)
    SSE_MIN_MAX_D ecx, ecx, 0, 1
    SSE_MIN_MAX_D ecx, ecx, 2, 3
    SSE_MIN_MAX_D ecx, ecx, 4, 5
    SSE_MIN_MAX_D ecx, ecx, 6, 7


    ; Now we have 2 "columns" ordered in pairs and we need only to merge them. Each 'column' has this ordering: Column1 [0,2,4,6] and Column2 [1,3,5,7]

    ; And we simply needs to scan among the 2 columns,compare their values and exchange if some is bigger then the other.
    ; Keep track on the next positions, Remembering that:
    ; ecx = The pointer to the start of our temporary table (pCurTable). This pointer will be then updated on each loop.
    ; esi = The pointer to the double on the temporary table. Our imaginary variable "NextTblPos". It is always 8 bytes ahead from the pCurTable
    ; edi = The pointer to the address we are outputing the data. Since we are replacing the input, then it will be our output as well.
    ; edx  = End of our temporary table

    movsd XMM1 X$esi ; xmm1 contains now the next Double from our Table. So we are at NextTblPos (edi+8)

    .Do
        ; Store the Double from the Current Table Pos to xmm0 and compare it to the value of the next position stored at xmm1
        movsd xmm0 X$ecx
        SSE_D_If xmm0 > xmm1

            ; The value of the current position is bigger then the next one. ex: (current) 100 ----> (next) 88
            ; We need to exchange their Positions and get the new value. So, we are looking for 88 rather then 100
            ; Since we are using registers to store their addresses, the faster way to do it is with a simple xcgh instruction.
            ; If we were using local variables the code will be something like this:
                ; mov eax D@pCurTable | mov ebx esi ; pCurTable = NextTblPos and vice-versa
                ; mov D@pCurTable ebx | mov esi eax ; NextTblPos = CurTable

            xchg ecx esi ;  Use xchg, instead. Now pCurTable = NextTblPos and vice-versa: NextTblPos = CurTable

            movsd X$edi XMM1 ; Store the smaller value at xmm1 onto our ouptu at edi.
            ; and update the value to xmm1 for the next comparition. We don´t need to exchange xmm registers,
            ; because we already exchanged thei addresses and, therefore, on the next loop xmm0 will be filed with the updated value
            movsd xmm1 xmm0

        SSE_D_Else

            ; The value of the current position is smaller or equal to the nextt double. Ex: (cur) 88 --- > (Next) 100.
            ; Save it on output at edi
            movsd X$edi XMM0

        SSE_D_End

        ; Update the output to 8 bytes. 8 because this is the size of a double, so we need to update it´s position to the next address to be stored
        add edi 8

        ; And do the same for the input (pCurTable at ecx)at our current table position. But, here we are updating to 16 bytes (2 doubles) because our current table
        ; works in pair tha were previously ordered. So, we need to see the next pair only of the table
        add ecx (2*8)
    .Loop_Until ecx >= edx ; did we reached the end of the table ?


    ; We are almost done. Now we need to check only if there are some pairs that were left over the sorting.
    ; At leats one pair was left over on the past switch, so, since we switched the current table pos at ecx with the next one at esi, we then need to check
    ; if there are more tehn one pairs that were not saved yet

    Do
        movsd XMM0 X$esi | movsd X$edi XMM0
        add edi 8
        add esi (2*8)
    Loop_Until esi >= edx

EndP




Macros used on this function:


SSE_S conditional comparisons set (for double):


[SSE_D_If    |#=3| comisd #1 #3 | jn#2 G0> ]
[SSE_D_Else  | jmp G5> | G0: ]
[SSE_D_End   | G0: | G5:]
[SSE_D_Else_If  | SSE_D_Else    | SSE_D_If    #1 #2 #3]
[SSE_D_If_And   | SSE_D_If #1 #2 #3    | #+3]
[SSE_D_Else_If_And    | SSE_D_Else    | SSE_D_If_And    #F>L]
[SSE_D_If_Or  | comisd #1 #3 | j#2 H0>  | #+3 | jmp G0>  | H0:]

[.SSE_D_If    |#=3| comisd #1 #3 | jn#2 G1>> ]
[.SSE_D_Else  | jmp G6>> | G1: ]
[.SSE_D_End   | G1: | G6:]
[.SSE_D_Else_If  | SSE_D_Else    | SSE_D_If    #1 #2 #3]
[.SSE_D_If_And   | SSE_D_If #1 #2 #3    | #+3]
[.SSE_D_Else_If_And    | SSE_D_Else    | SSE_D_If_And    #F>L]
[.SSE_D_If_Or  | comisd #1 #3 | j#2 H1>>  | #+3 | jmp G1>>  | H1:]




SSE get Min / Max from Doubles

        ; Get the min max values of a xmm register and put them on the output
        [SSE_MIN_MAX_D | movups XMM0 X$#1+(16*#3) | movups xmm2 xmm0 | movups XMM1 X$#1+(16*#4) | minpd XMM0 xmm1 | movups X$#2+(16*#3) XMM0 | maxpd XMM2 xmm1 | movups X$#2+(16*#4) XMM2]

            ; Parameter #1 = Input register or memory buffer to load the data to xmm register. The input can be a register or a memory location
            ; Parameter #2 = The output register or Buffer where min max will be written onto. The output can be a register or a memory location
            ; Parameter #3 = The starting position to read/write the data from. It is a integer value from 0 to 7. So, it´s a maximum fo half the size of the Input of 16 Doubles
            ; Parameter #4 = The ending position to read/write the data from. It is a integer value from 0 to 7. So, it´s a maximum fo half the size of the Input of 16 Doubles
            ; It works like this: Ex: SSE_MIN_MAX_D edi, ecx, 0, 1
            ;   Get the data (2 doubles) at Starting Pos * 2 == 0*2 (On this example), and compare them to ending pos*1 (1*2). The comparition is done from the min/max values
            ;   from the the 2 doubles loaded from Pos1 and 2 doubles from Pos2.
            ;   In bytes, since we are dealing with Doubles, the math always multiply the position of those values by 16 (8*2 = 8 from te size of the double and 2 for the interleaving)
            ;   So, the data to be analysed is, in fact 8 bytes * 2 * Wanted Position away from the input. On this example, if we feed 1, we are getting the 1st 2 double values
            ;   from 32 bytes (2*8*1) ahead to the start of the input