diff --git a/.gitignore b/.gitignore index 4fb6f52c9e85b18545d99db658f480739b70cc16..37223d745b040e77861c6a47af1ffa56771edc88 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ bin/ tmp/ *.tmp +!/src/main/java/io/mycat/sqlengine/mpp/tmp *.bak *.swp *~.nib diff --git a/src/main/java/io/mycat/sqlengine/mpp/tmp/HeapItf.java b/src/main/java/io/mycat/sqlengine/mpp/tmp/HeapItf.java new file mode 100644 index 0000000000000000000000000000000000000000..1ef2498ff22ca7a133e45df43a97adc195dc0f8c --- /dev/null +++ b/src/main/java/io/mycat/sqlengine/mpp/tmp/HeapItf.java @@ -0,0 +1,57 @@ +package io.mycat.sqlengine.mpp.tmp; + +import io.mycat.net.mysql.RowDataPacket; + +import java.util.List; + +/** + * @author coderczp-2014-12-17 + */ +public interface HeapItf { + + /** + * 构建堆 + */ + void buildHeap(); + + /** + * 获取堆根节点 + * + * @return + */ + RowDataPacket getRoot(); + + /** + * 向堆添加元素 + * + * @param row + */ + void add(RowDataPacket row); + + /** + * 获取堆数据 + * + * @return + */ + List getData(); + + /** + * 设置根节点元素 + * + * @param root + */ + void setRoot(RowDataPacket root); + + /** + * 向已满的堆添加元素 + * + * @param row + */ + boolean addIfRequired(RowDataPacket row); + + /** + * 堆排序 + */ + void heapSort(int size); + +} \ No newline at end of file diff --git a/src/main/java/io/mycat/sqlengine/mpp/tmp/MaxHeap.java b/src/main/java/io/mycat/sqlengine/mpp/tmp/MaxHeap.java new file mode 100644 index 0000000000000000000000000000000000000000..02d749061c64806020dcf28491990ff78dc97a9b --- /dev/null +++ b/src/main/java/io/mycat/sqlengine/mpp/tmp/MaxHeap.java @@ -0,0 +1,139 @@ +package io.mycat.sqlengine.mpp.tmp; + +import io.mycat.net.mysql.RowDataPacket; + +import java.util.ArrayList; +import java.util.List; + +/** + * 最大堆排序,适用于顺序排序 + * + * @author coderczp-2014-12-8 + */ +public class MaxHeap implements HeapItf { + + private RowDataCmp cmp; + private List data; + + public MaxHeap(RowDataCmp cmp, int size) { + this.cmp = cmp; + this.data = new ArrayList<>(); + } + + @Override + public void buildHeap() { + int len = data.size(); + for (int i = len / 2 - 1; i >= 0; i--) { + heapifyRecursive(i, len); + } + } + + private void heapify(int i, int size) { + int max = 0; + int mid = size >> 1;// ==size/2 + while (i <= mid) { + max = i; + int left = i << 1; + int right = left + 1; + if (left < size && cmp.compare(data.get(left), data.get(i)) > 0) { + max = left; + } + if (right < size && cmp.compare(data.get(right), data.get(max)) > 0) { + max = right; + } + if (i == max) { + break; + } + if (i != max) { + RowDataPacket tmp = data.get(i); + data.set(i, data.get(max)); + data.set(max, tmp); + i = max; + } + } + + } + + // 递归版本 + protected void heapifyRecursive(int i, int size) { + int l = left(i); + int r = right(i); + int max = i; + if (l < size && cmp.compare(data.get(l), data.get(i)) > 0) { + max = l; + } + if (r < size && cmp.compare(data.get(r), data.get(max)) > 0) { + max = r; + } + if (i == max) { + return; + } + swap(i, max); + heapifyRecursive(max, size); + } + + + private int right(int i) { + return (i + 1) << 1; + } + + private int left(int i) { + return ((i + 1) << 1) - 1; + } + + private void swap(int i, int j) { + RowDataPacket tmp = data.get(i); + RowDataPacket elementAt = data.get(j); + data.set(i, elementAt); + data.set(j, tmp); + } + + @Override + public RowDataPacket getRoot() { + return data.get(0); + } + + @Override + public void setRoot(RowDataPacket root) { + data.set(0, root); + heapifyRecursive(0, data.size()); + } + + @Override + public List getData() { + return data; + } + + @Override + public void add(RowDataPacket row) { + data.add(row); + } + + @Override + public boolean addIfRequired(RowDataPacket row) { + // 淘汰堆里最小的数据 + RowDataPacket root = getRoot(); + if (cmp.compare(row, root) < 0) { + setRoot(row); + return true; + } + return false; + } + + @Override + public void heapSort(int size) { + final int total = data.size(); + // 容错处理 + if (size <= 0 || size > total) { + size = total; + } + final int min = size == total ? 0 : (total - size - 1); + + // 末尾与头交换,交换后调整最大堆 + for (int i = total - 1; i > min; i--) { + swap(0, i); + heapifyRecursive(0, i); + } + } + +} \ No newline at end of file diff --git a/src/main/java/io/mycat/sqlengine/mpp/tmp/RowDataCmp.java b/src/main/java/io/mycat/sqlengine/mpp/tmp/RowDataCmp.java new file mode 100644 index 0000000000000000000000000000000000000000..326f6e048f421a943a94f97542ecabd8ff5c32ce --- /dev/null +++ b/src/main/java/io/mycat/sqlengine/mpp/tmp/RowDataCmp.java @@ -0,0 +1,47 @@ +package io.mycat.sqlengine.mpp.tmp; + +import io.mycat.net.mysql.RowDataPacket; +import io.mycat.sqlengine.mpp.OrderCol; +import io.mycat.sqlengine.mpp.RowDataPacketSorter; + +import java.util.Comparator; + +/** + * + * @author coderczp-2014-12-8 + */ +public class RowDataCmp implements Comparator { + + private OrderCol[] orderCols; + + public RowDataCmp(OrderCol[] orderCols) { + this.orderCols = orderCols; + } + + @Override + public int compare(RowDataPacket o1, RowDataPacket o2) { + OrderCol[] tmp = this.orderCols; + int cmp = 0; + int len = tmp.length; + //依次比较order by语句上的多个排序字段的值 + int type = OrderCol.COL_ORDER_TYPE_ASC; + for (int i = 0; i < len; i++) { + int colIndex = tmp[i].colMeta.colIndex; + byte[] left = o1.fieldValues.get(colIndex); + byte[] right = o2.fieldValues.get(colIndex); + // fix bug 当 order by 列 为 null 时, 报空指针的异常. + if(left==null){ left = new byte[0];} + if(right==null){ right = new byte[0];} + if (tmp[i].orderType == type) { + cmp = RowDataPacketSorter.compareObject(left, right, tmp[i]); + } else { + cmp = RowDataPacketSorter.compareObject(right, left, tmp[i]); + } + if (cmp != 0) { + return cmp; + } + } + return cmp; + } + +} \ No newline at end of file diff --git a/src/main/java/io/mycat/sqlengine/mpp/tmp/RowDataSorter.java b/src/main/java/io/mycat/sqlengine/mpp/tmp/RowDataSorter.java new file mode 100644 index 0000000000000000000000000000000000000000..5085a3ac9def504da906d5bf7b17a12c86a567c5 --- /dev/null +++ b/src/main/java/io/mycat/sqlengine/mpp/tmp/RowDataSorter.java @@ -0,0 +1,84 @@ +package io.mycat.sqlengine.mpp.tmp; + +import io.mycat.net.mysql.RowDataPacket; +import io.mycat.sqlengine.mpp.OrderCol; +import io.mycat.sqlengine.mpp.RowDataPacketSorter; + +import java.util.Collections; +import java.util.List; + +/** + * + * @author coderczp-2014-12-8 + */ +public class RowDataSorter extends RowDataPacketSorter { + + // 记录总数(=offset+limit) + private volatile int total; + // 查询的记录数(=limit) + private volatile int size; + // 堆 + private volatile HeapItf heap; + // 多列比较器 + private volatile RowDataCmp cmp; + // 是否执行过buildHeap + private volatile boolean hasBuild; + + public RowDataSorter(OrderCol[] orderCols) { + super(orderCols); + this.cmp = new RowDataCmp(orderCols); + } + + public synchronized void setLimit(int start, int size) { + // 容错处理 + if (start < 0) { + start = 0; + } + if (size <= 0) { + this.total = this.size = Integer.MAX_VALUE; + } else { + this.total = start + size; + this.size = size; + } + // 统一采用顺序,order by 条件交给比较器去处理 + this.heap = new MaxHeap(cmp, total); + } + + @Override + public synchronized boolean addRow(RowDataPacket row) { + if (heap.getData().size() < total) { + heap.add(row); + return true; + } + // 堆已满,构建最大堆,并执行淘汰元素逻辑 + if (heap.getData().size() == total && hasBuild == false) { + heap.buildHeap(); + hasBuild = true; + } + return heap.addIfRequired(row); + } + + @Override + public List getSortedResult() { + final List data = heap.getData(); + if (data.size() < 2) { + return data; + } + + if (total - size > data.size()) { + return Collections.emptyList(); + } + + // 构建最大堆并排序 + if (!hasBuild) { + heap.buildHeap(); + } + heap.heapSort(this.size); + return heap.getData(); + } + + public RowDataCmp getCmp() { + return cmp; + } + +} \ No newline at end of file