From fb7250349e9a895dd1d01faf1fa747776fd7abd5 Mon Sep 17 00:00:00 2001 From: hedyn Date: Sun, 29 Oct 2017 15:05:52 +0800 Subject: [PATCH 01/13] =?UTF-8?q?=E4=BF=AE=E6=94=B9pom?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 130 +++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 125 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index b4cf451..358a468 100644 --- a/pom.xml +++ b/pom.xml @@ -1,11 +1,6 @@ 4.0.0 - - net.apexes - apexes-root - 0.0.1 - net.apexes.fqueue fqueue 1.1.0 @@ -32,4 +27,129 @@ scm:git:git@git.oschina.net:apexes/fqueue.git git@git.oschina.net:apexes/fqueue.git + + + + alimaven + http://maven.aliyun.com/nexus/content/groups/public + + true + + + false + always + fail + + + + + + alimaven + http://maven.aliyun.com/nexus/content/groups/public + + + + + 1.7 + 1.7 + + + 3.6.1 + 3.0.2 + 3.0.1 + 2.10.4 + 2.19.1 + 1.6 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + ${version.maven-compiler-plugin} + + ${maven.compiler.source} + ${maven.compiler.target} + UTF-8 + + + + + org.apache.maven.plugins + maven-surefire-plugin + ${version.maven-surefire-plugin} + + true + + + + + + + + release + + + + + org.apache.maven.plugins + maven-source-plugin + ${version.maven-source-plugin} + + + package + + jar-no-fork + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + ${version.maven-javadoc-plugin} + + UTF-8 + -Xdoclint:none + + + + package + + jar + + + + + + + org.apache.maven.plugins + maven-gpg-plugin + ${version.maven-gpg-plugin} + + + install + + sign + + + + + + + + + oss + https://oss.sonatype.org/content/repositories/snapshots/ + + + oss + https://oss.sonatype.org/service/local/staging/deploy/maven2/ + + + + \ No newline at end of file -- Gitee From ad135067343a3deb1a5825ae460fb54cb3043359 Mon Sep 17 00:00:00 2001 From: hedyn Date: Sun, 29 Oct 2017 17:53:39 +0800 Subject: [PATCH 02/13] =?UTF-8?q?=E5=A2=9E=E5=8A=A0ignore?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7f1f8e4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +.* +/target/ +/**/.* +/*.iml +/**/*.iml + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* +!/.gitignore -- Gitee From dd333dfd064b37a40a3bab6d14061970ca75f24b Mon Sep 17 00:00:00 2001 From: hedyn Date: Mon, 16 Jul 2018 20:19:04 +0800 Subject: [PATCH 03/13] =?UTF-8?q?=E4=BF=AE=E5=A4=8D#IG5AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/net/apexes/fqueue/FQueue.java | 174 +++++----- src/main/java/net/apexes/fqueue/FSQueue.java | 274 ++++++++------- .../fqueue/exception/FileEOFException.java | 43 +-- .../fqueue/exception/FileFormatException.java | 41 ++- .../net/apexes/fqueue/internal/Entity.java | 45 ++- .../net/apexes/fqueue/internal/Index.java | 326 +++++++++--------- .../net/apexes/fqueue/demo/FQueueDemo2.java | 30 ++ 7 files changed, 481 insertions(+), 452 deletions(-) create mode 100644 src/test/java/net/apexes/fqueue/demo/FQueueDemo2.java diff --git a/src/main/java/net/apexes/fqueue/FQueue.java b/src/main/java/net/apexes/fqueue/FQueue.java index a192944..ecef51d 100644 --- a/src/main/java/net/apexes/fqueue/FQueue.java +++ b/src/main/java/net/apexes/fqueue/FQueue.java @@ -12,120 +12,124 @@ import net.apexes.fqueue.exception.FileFormatException; /** * 基于文件系统的持久化队列 - * + * * @author HeDYn * @author sunli */ public class FQueue extends AbstractQueue implements Serializable { - private static final long serialVersionUID = -1L; + private static final long serialVersionUID = -1L; - private FSQueue fsQueue = null; - private Lock lock = new ReentrantReadWriteLock().writeLock(); - - public FQueue(String path) throws IOException, FileFormatException { - fsQueue = new FSQueue(path); - } + private FSQueue fsQueue = null; + private Lock lock = new ReentrantReadWriteLock().writeLock(); - /** - * 创建一个持久化队列 - * @param path 文件的存储路径 - * @param entityLimitLength 存储数据的单个文件的大小 - * @throws IOException - * @throws FileFormatException - */ - public FQueue(String path, int entityLimitLength) throws IOException, FileFormatException { - fsQueue = new FSQueue(path, entityLimitLength); - } - - public FQueue(File dir) throws IOException, FileFormatException { - fsQueue = new FSQueue(dir); - } - - /** + public FQueue(String path) throws IOException, FileFormatException { + fsQueue = new FSQueue(path); + } + + /** + * 创建一个持久化队列 + * + * @param path 文件的存储路径 + * @param entityLimitLength 存储数据的单个文件的大小 + * @throws IOException + * @throws FileFormatException + */ + public FQueue(String path, int entityLimitLength) throws IOException, FileFormatException { + fsQueue = new FSQueue(path, entityLimitLength); + } + + public FQueue(File dir) throws IOException, FileFormatException { + fsQueue = new FSQueue(dir); + } + + /** * 创建一个持久化队列 - * @param path 文件的存储目录 + * + * @param dir 文件的存储目录 * @param entityLimitLength 存储数据的单个文件的大小 * @throws IOException * @throws FileFormatException */ - public FQueue(File dir, int entityLimitLength) throws IOException, FileFormatException { + public FQueue(File dir, int entityLimitLength) throws IOException, FileFormatException { fsQueue = new FSQueue(dir, entityLimitLength); } - @Override - public Iterator iterator() { - throw new UnsupportedOperationException("iterator Unsupported now"); - } + @Override + public Iterator iterator() { + throw new UnsupportedOperationException("iterator Unsupported now"); + } - @Override - public int size() { - return fsQueue.getQueueSize(); - } + @Override + public int size() { + return fsQueue.getQueueSize(); + } - @Override - public boolean offer(byte[] e) { - try { - lock.lock(); - fsQueue.add(e); - return true; - } catch (IOException ex) { - throw new RuntimeException(ex); - } catch (FileFormatException ex) { - } finally { - lock.unlock(); - } - return false; - } + @Override + public boolean offer(byte[] e) { + try { + lock.lock(); + fsQueue.add(e); + return true; + } catch (IOException ex) { + throw new RuntimeException(ex); + } catch (FileFormatException ex) { + } finally { + lock.unlock(); + } + return false; + } - @Override - public byte[] peek() { + @Override + public byte[] peek() { try { - lock.lock(); - return fsQueue.readNext(); - } catch (IOException ex) { - return null; - } catch (FileFormatException ex) { - return null; - } finally { - lock.unlock(); - } - } + lock.lock(); + return fsQueue.readNext(); + } catch (IOException ex) { + return null; + } catch (FileFormatException ex) { + return null; + } finally { + lock.unlock(); + } + } - @Override - public byte[] poll() { - try { - lock.lock(); - return fsQueue.readNextAndRemove(); - } catch (IOException ex) { - return null; - } catch (FileFormatException ex) { - return null; - } finally { - lock.unlock(); - } - } - - @Override + @Override + public byte[] poll() { + try { + lock.lock(); + return fsQueue.readNextAndRemove(); + } catch (IOException ex) { + return null; + } catch (FileFormatException ex) { + return null; + } finally { + lock.unlock(); + } + } + + @Override public void clear() { - try { + try { lock.lock(); fsQueue.clear(); } catch (IOException ex) { throw new RuntimeException(ex); } catch (FileFormatException e) { + // ignore } finally { lock.unlock(); } } /** - * 关闭文件队列 - * @throws IOException - * @throws FileFormatException - */ - public void close() throws IOException, FileFormatException { - if (fsQueue != null) { - fsQueue.close(); - } - } + * 关闭文件队列 + * + * @throws IOException + * @throws FileFormatException + */ + public void close() throws IOException, FileFormatException { + if (fsQueue != null) { + fsQueue.close(); + } + } } diff --git a/src/main/java/net/apexes/fqueue/FSQueue.java b/src/main/java/net/apexes/fqueue/FSQueue.java index 5b647b9..e55aad2 100644 --- a/src/main/java/net/apexes/fqueue/FSQueue.java +++ b/src/main/java/net/apexes/fqueue/FSQueue.java @@ -15,67 +15,63 @@ import net.apexes.fqueue.internal.Index; * @author sunli */ public class FSQueue { - private int entityLimitLength; - private String path = null; - /** - * 文件操作实例 - */ - private Index idx = null; - private Entity writerHandle = null; - private Entity readerHandle = null; - /** - * 文件操作位置信息 - */ - private int readerIndex = -1; - private int writerIndex = -1; - - public FSQueue(String dir) throws IOException, FileFormatException { - this(new File(dir)); - } - - /** - * 在指定的目录中,以fileLimitLength为单个数据文件的最大大小限制初始化队列存储 - * - * @param dir - * 队列数据存储的路径 - * @param entityLimitLength - * 单个数据文件的大小,不能超过2G - * @throws IOException - * @throws FileFormatException - */ - public FSQueue(String dir, int entityLimitLength) throws IOException, FileFormatException { - this(new File(dir), entityLimitLength); - } - - public FSQueue(File dir) throws IOException, FileFormatException { - this(dir, 1024 * 1024 * 2); - } - - /** + private int entityLimitLength; + private String path = null; + /** + * 文件操作实例 + */ + private Index idx = null; + private Entity writerHandle = null; + private Entity readerHandle = null; + /** + * 文件操作位置信息 + */ + private int readerIndex = -1; + private int writerIndex = -1; + + public FSQueue(String dir) throws IOException, FileFormatException { + this(new File(dir)); + } + + /** * 在指定的目录中,以fileLimitLength为单个数据文件的最大大小限制初始化队列存储 * - * @param dir - * 队列数据存储的目录 - * @param entityLimitLength - * 单个数据文件的大小,不能超过2G + * @param dir 队列数据存储的路径 + * @param entityLimitLength 单个数据文件的大小,不能超过2G * @throws IOException * @throws FileFormatException */ - public FSQueue(File dir, int entityLimitLength) throws IOException, FileFormatException { - if (dir.exists() == false && dir.isDirectory() == false) { + public FSQueue(String dir, int entityLimitLength) throws IOException, FileFormatException { + this(new File(dir), entityLimitLength); + } + + public FSQueue(File dir) throws IOException, FileFormatException { + this(dir, 1024 * 1024 * 2); + } + + /** + * 在指定的目录中,以fileLimitLength为单个数据文件的最大大小限制初始化队列存储 + * + * @param dir 队列数据存储的目录 + * @param entityLimitLength 单个数据文件的大小,不能超过2G + * @throws IOException + * @throws FileFormatException + */ + public FSQueue(File dir, int entityLimitLength) throws IOException, FileFormatException { + if (dir.exists() == false && dir.isDirectory() == false) { if (dir.mkdirs() == false) { throw new IOException("create dir error"); } } - this.entityLimitLength = entityLimitLength; + this.entityLimitLength = entityLimitLength; path = dir.getAbsolutePath(); // 打开索引文件 idx = new Index(path); initHandle(); - } - - private void initHandle() throws IOException, FileFormatException { - writerIndex = idx.getWriterIndex(); + } + + private void initHandle() throws IOException, FileFormatException { + writerIndex = idx.getWriterIndex(); readerIndex = idx.getReaderIndex(); writerHandle = new Entity(path, writerIndex, entityLimitLength, idx); if (readerIndex == writerIndex) { @@ -83,82 +79,81 @@ public class FSQueue { } else { readerHandle = new Entity(path, readerIndex, entityLimitLength, idx); } - } - - /** - * 一个文件的数据写入达到fileLimitLength的时候,滚动到下一个文件实例 - * - * @throws IOException - * @throws FileFormatException - */ - private void rotateNextLogWriter() throws IOException, FileFormatException { - writerIndex = writerIndex + 1; - writerHandle.putNextFileNumber(writerIndex); - if (readerHandle != writerHandle) { - writerHandle.close(); - } - idx.putWriterIndex(writerIndex); - writerHandle = new Entity(path, writerIndex, entityLimitLength, idx, true); - } - - /** - * 向队列存储添加一个字符串 - * - * @param message - * message - * @throws IOException - * @throws FileFormatException - */ - public void add(String message) throws IOException, FileFormatException { - add(message.getBytes()); - } - - /** - * 向队列存储添加一个byte数组 - * - * @param message - * @throws IOException - * @throws FileFormatException - */ - public void add(byte[] message) throws IOException, FileFormatException { - short status = writerHandle.write(message); - if (status == Entity.WRITEFULL) { - rotateNextLogWriter(); - status = writerHandle.write(message); - } - if (status == Entity.WRITESUCCESS) { - idx.incrementSize(); - } - } + } + + /** + * 一个文件的数据写入达到fileLimitLength的时候,滚动到下一个文件实例 + * + * @throws IOException + * @throws FileFormatException + */ + private void rotateNextLogWriter() throws IOException, FileFormatException { + writerIndex = writerIndex + 1; + writerHandle.putNextFileNumber(writerIndex); + if (readerHandle != writerHandle) { + writerHandle.close(); + } + idx.putWriterIndex(writerIndex); + writerHandle = new Entity(path, writerIndex, entityLimitLength, idx, true); + } + + /** + * 向队列存储添加一个字符串 + * + * @param message message + * @throws IOException + * @throws FileFormatException + */ + public void add(String message) throws IOException, FileFormatException { + add(message.getBytes()); + } + + /** + * 向队列存储添加一个byte数组 + * + * @param message + * @throws IOException + * @throws FileFormatException + */ + public void add(byte[] message) throws IOException, FileFormatException { + short status = writerHandle.write(message); + if (status == Entity.WRITEFULL) { + rotateNextLogWriter(); + status = writerHandle.write(message); + } + if (status == Entity.WRITESUCCESS) { + idx.incrementSize(); + } + } private byte[] read(boolean commit) throws IOException, FileFormatException { - byte[] bytes = null; - try { - bytes = readerHandle.read(commit); - } catch (FileEOFException e) { - int nextFileNumber = readerHandle.getNextFileNumber(); + byte[] bytes; + try { + bytes = readerHandle.read(commit); + } catch (FileEOFException e) { + int nextFileNumber = readerHandle.getNextFileNumber(); readerHandle.reset(); File deleteFile = readerHandle.getFile(); - readerHandle.close(); + readerHandle.close(); deleteFile.delete(); // 更新下一次读取的位置和索引 - idx.putReaderPosition(Entity.MESSAGE_START_POSITION); - idx.putReaderIndex(nextFileNumber); - if (writerHandle.getCurrentFileNumber() == nextFileNumber) { - readerHandle = writerHandle; - } else { - readerHandle = new Entity(path, nextFileNumber, entityLimitLength, idx); - } - try { - bytes = readerHandle.read(commit); - } catch (FileEOFException e1) { - throw new FileFormatException(e1); - } - } - if (bytes != null) { - idx.decrementSize(); - } - return bytes; + idx.putReaderPosition(Entity.MESSAGE_START_POSITION); + idx.putReaderIndex(nextFileNumber); + if (writerHandle.getCurrentFileNumber() == nextFileNumber) { + readerHandle = writerHandle; + } else { + readerHandle = new Entity(path, nextFileNumber, entityLimitLength, idx); + } + try { + bytes = readerHandle.read(commit); + } catch (FileEOFException e1) { + throw new FileFormatException(e1); + } + } + if (commit && bytes != null) { + idx.decrementSize(); + } + return bytes; } /** @@ -172,28 +167,29 @@ public class FSQueue { return read(false); } - /** - * 从队列存储中取出最先入队的数据,并移除它 - * @return - * @throws IOException - * @throws FileFormatException - */ - public byte[] readNextAndRemove() throws IOException, FileFormatException { + /** + * 从队列存储中取出最先入队的数据,并移除它 + * + * @return + * @throws IOException + * @throws FileFormatException + */ + public byte[] readNextAndRemove() throws IOException, FileFormatException { return read(true); - } - - public void clear() throws IOException, FileFormatException { - idx.clear(); - initHandle(); - } - - public void close() throws IOException { - readerHandle.close(); - writerHandle.close(); + } + + public void clear() throws IOException, FileFormatException { + idx.clear(); + initHandle(); + } + + public void close() throws IOException { + readerHandle.close(); + writerHandle.close(); idx.close(); - } + } - public int getQueueSize() { - return idx.getSize(); - } + public int getQueueSize() { + return idx.getSize(); + } } diff --git a/src/main/java/net/apexes/fqueue/exception/FileEOFException.java b/src/main/java/net/apexes/fqueue/exception/FileEOFException.java index 1169dc3..d49ff8b 100644 --- a/src/main/java/net/apexes/fqueue/exception/FileEOFException.java +++ b/src/main/java/net/apexes/fqueue/exception/FileEOFException.java @@ -1,30 +1,31 @@ package net.apexes.fqueue.exception; /** - * * @author sunli */ public class FileEOFException extends Exception { - private static final long serialVersionUID = -1L; - - public FileEOFException() { - super(); - } - public FileEOFException(String message) { - super(message); - } - - public FileEOFException(String message, Throwable cause) { - super(message, cause); - } - - public FileEOFException(Throwable cause) { - super(cause); - } - @Override - public Throwable fillInStackTrace() { - return this; - } + private static final long serialVersionUID = -1L; + + public FileEOFException() { + super(); + } + + public FileEOFException(String message) { + super(message); + } + + public FileEOFException(String message, Throwable cause) { + super(message, cause); + } + + public FileEOFException(Throwable cause) { + super(cause); + } + + @Override + public Throwable fillInStackTrace() { + return this; + } } diff --git a/src/main/java/net/apexes/fqueue/exception/FileFormatException.java b/src/main/java/net/apexes/fqueue/exception/FileFormatException.java index 1494249..6bf4009 100644 --- a/src/main/java/net/apexes/fqueue/exception/FileFormatException.java +++ b/src/main/java/net/apexes/fqueue/exception/FileFormatException.java @@ -1,33 +1,32 @@ package net.apexes.fqueue.exception; /** - * * @author sunli */ public class FileFormatException extends Exception { - /** - * - */ - private static final long serialVersionUID = -1L; + /** + * + */ + private static final long serialVersionUID = -1L; - /** - * Constructs an {@code FileFormatException} with {@code null} as its error - * detail message. - */ - public FileFormatException() { - super(); - } + /** + * Constructs an {@code FileFormatException} with {@code null} as its error + * detail message. + */ + public FileFormatException() { + super(); + } - public FileFormatException(String message) { - super(message); - } + public FileFormatException(String message) { + super(message); + } - public FileFormatException(String message, Throwable cause) { - super(message, cause); - } + public FileFormatException(String message, Throwable cause) { + super(message, cause); + } - public FileFormatException(Throwable cause) { - super(cause); - } + public FileFormatException(Throwable cause) { + super(cause); + } } diff --git a/src/main/java/net/apexes/fqueue/internal/Entity.java b/src/main/java/net/apexes/fqueue/internal/Entity.java index 5942c30..1753306 100644 --- a/src/main/java/net/apexes/fqueue/internal/Entity.java +++ b/src/main/java/net/apexes/fqueue/internal/Entity.java @@ -47,7 +47,7 @@ public class Entity { } public Entity(String path, int fileNumber, int fileLimitLength, - Index idx, boolean create) throws IOException, FileFormatException { + Index idx, boolean create) throws IOException, FileFormatException { this.currentFileNumber = fileNumber; this.fileLimitLength = fileLimitLength; this.idx = idx; @@ -57,7 +57,7 @@ public class Entity { createLogEntity(); } else { raFile = new RandomAccessFile(file, "rwd"); - int fileLength = (int)raFile.length(); + int fileLength = (int) raFile.length(); if (fileLength < MESSAGE_START_POSITION) { throw new FileFormatException("file format error"); } @@ -172,7 +172,6 @@ public class Entity { } /** - * * @param commit 如果为 false,则只读取数据,不移动读取指针。 * @return * @throws IOException @@ -238,24 +237,24 @@ public class Entity { } private class Sync implements Runnable { - @Override - public void run() { - while (true) { - if (mappedByteBuffer != null) { - try { - mappedByteBuffer.force(); - } catch (Exception e) { - break; - } - try { - Thread.sleep(10); - } catch (InterruptedException e) { - break; - } - } else { - break; - } - } - } - } + @Override + public void run() { + while (true) { + if (mappedByteBuffer != null) { + try { + mappedByteBuffer.force(); + } catch (Exception e) { + break; + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + break; + } + } else { + break; + } + } + } + } } diff --git a/src/main/java/net/apexes/fqueue/internal/Index.java b/src/main/java/net/apexes/fqueue/internal/Index.java index c25130b..cbd7aa9 100644 --- a/src/main/java/net/apexes/fqueue/internal/Index.java +++ b/src/main/java/net/apexes/fqueue/internal/Index.java @@ -18,197 +18,197 @@ import net.apexes.fqueue.exception.FileFormatException; * @author sunli */ public class Index { - private static final int INDEX_LIMIT_LENGTH = 32; + private static final int INDEX_LIMIT_LENGTH = 32; private static final String INDEX_FILE_NAME = "fq.idx"; - private RandomAccessFile dbRandFile = null; - private FileChannel fc; - private MappedByteBuffer mappedByteBuffer; - - /** - * 文件操作位置信息 - */ - private String magicString = null; - private int version = -1; - private int readerPosition = -1; - private int writerPosition = -1; - private int readerIndex = -1; - private int writerIndex = -1; - private AtomicInteger size = new AtomicInteger(); - - public Index(String path) throws IOException, FileFormatException { - File dbFile = new File(path, INDEX_FILE_NAME); - - // 文件不存在,创建文件 - if (dbFile.exists() == false) { - dbFile.createNewFile(); - dbRandFile = new RandomAccessFile(dbFile, "rwd"); - initIdxFile(); - } else { - dbRandFile = new RandomAccessFile(dbFile, "rwd"); - if (dbRandFile.length() < INDEX_LIMIT_LENGTH) { - throw new FileFormatException("file format error."); - } - byte[] bytes = new byte[INDEX_LIMIT_LENGTH]; - dbRandFile.read(bytes); + private RandomAccessFile dbRandFile; + private FileChannel fc; + private MappedByteBuffer mappedByteBuffer; + + /** + * 文件操作位置信息 + */ + private String magicString = null; + private int version = -1; + private int readerPosition = -1; + private int writerPosition = -1; + private int readerIndex = -1; + private int writerIndex = -1; + private AtomicInteger size = new AtomicInteger(); + + public Index(String path) throws IOException, FileFormatException { + File dbFile = new File(path, INDEX_FILE_NAME); + + // 文件不存在,创建文件 + if (dbFile.exists() == false) { + dbFile.createNewFile(); + dbRandFile = new RandomAccessFile(dbFile, "rwd"); + initIdxFile(); + } else { + dbRandFile = new RandomAccessFile(dbFile, "rwd"); + if (dbRandFile.length() < INDEX_LIMIT_LENGTH) { + throw new FileFormatException("file format error."); + } + byte[] bytes = new byte[INDEX_LIMIT_LENGTH]; + dbRandFile.read(bytes); ByteBuffer buffer = ByteBuffer.wrap(bytes); - bytes = new byte[Entity.MAGIC.getBytes().length]; - buffer.get(bytes); - magicString = new String(bytes); - version = buffer.getInt(); - readerPosition = buffer.getInt(); - writerPosition = buffer.getInt(); - readerIndex = buffer.getInt(); - writerIndex = buffer.getInt(); + bytes = new byte[Entity.MAGIC.getBytes().length]; + buffer.get(bytes); + magicString = new String(bytes); + version = buffer.getInt(); + readerPosition = buffer.getInt(); + writerPosition = buffer.getInt(); + readerIndex = buffer.getInt(); + writerIndex = buffer.getInt(); int sz = buffer.getInt(); if (readerPosition == writerPosition && readerIndex == writerIndex && sz <= 0) { - initIdxFile(); + initIdxFile(); } else { size.set(sz); } - } + } fc = dbRandFile.getChannel(); mappedByteBuffer = fc.map(MapMode.READ_WRITE, 0, INDEX_LIMIT_LENGTH); - } + } private void initIdxFile() throws IOException { - magicString = Entity.MAGIC; - version = 1; - readerPosition = Entity.MESSAGE_START_POSITION; - writerPosition = Entity.MESSAGE_START_POSITION; - readerIndex = 1; - writerIndex = 1; + magicString = Entity.MAGIC; + version = 1; + readerPosition = Entity.MESSAGE_START_POSITION; + writerPosition = Entity.MESSAGE_START_POSITION; + readerIndex = 1; + writerIndex = 1; dbRandFile.setLength(32); dbRandFile.seek(0); dbRandFile.write(magicString.getBytes());// magic dbRandFile.writeInt(version);// 8 version - dbRandFile.writeInt(readerPosition);// 12 reader position - dbRandFile.writeInt(writerPosition);// 16 write position - dbRandFile.writeInt(readerIndex);// 20 reader index - dbRandFile.writeInt(writerIndex);// 24 writer index - dbRandFile.writeInt(0);// 28 size + dbRandFile.writeInt(readerPosition);// 12 reader position + dbRandFile.writeInt(writerPosition);// 16 write position + dbRandFile.writeInt(readerIndex);// 20 reader index + dbRandFile.writeInt(writerIndex);// 24 writer index + dbRandFile.writeInt(0);// 28 size } - + public void clear() throws IOException { mappedByteBuffer.clear(); mappedByteBuffer.force(); initIdxFile(); } - /** - * 记录写位置 - * - * @param pos - */ - public void putWriterPosition(int pos) { - mappedByteBuffer.position(16); - mappedByteBuffer.putInt(pos); - this.writerPosition = pos; - } - - /** - * 记录读取的位置 - * - * @param pos - */ - public void putReaderPosition(int pos) { - mappedByteBuffer.position(12); - mappedByteBuffer.putInt(pos); - this.readerPosition = pos; - } - - /** - * 记录写文件索引 - * - * @param index - */ - public void putWriterIndex(int index) { - mappedByteBuffer.position(24); - mappedByteBuffer.putInt(index); - this.writerIndex = index; - } - - /** - * 记录读取文件索引 - * - * @param index - */ - public void putReaderIndex(int index) { - mappedByteBuffer.position(20); - mappedByteBuffer.putInt(index); - this.readerIndex = index; - } - - public void incrementSize() { - int num = size.incrementAndGet(); - mappedByteBuffer.position(28); - mappedByteBuffer.putInt(num); - } - - public void decrementSize() { - int num = size.decrementAndGet(); - mappedByteBuffer.position(28); - mappedByteBuffer.putInt(num); - } - - public String getMagicString() { - return magicString; - } - - public int getVersion() { - return version; - } - - public int getReaderPosition() { - return readerPosition; - } - - public int getWriterPosition() { - return writerPosition; - } - - public int getReaderIndex() { - return readerIndex; - } - - public int getWriterIndex() { - return writerIndex; - } - - public int getSize() { - return size.get(); - } - - /** - * 关闭索引文件 - */ - public void close() throws IOException { - mappedByteBuffer.force(); + /** + * 记录写位置 + * + * @param pos + */ + public void putWriterPosition(int pos) { + mappedByteBuffer.position(16); + mappedByteBuffer.putInt(pos); + this.writerPosition = pos; + } + + /** + * 记录读取的位置 + * + * @param pos + */ + public void putReaderPosition(int pos) { + mappedByteBuffer.position(12); + mappedByteBuffer.putInt(pos); + this.readerPosition = pos; + } + + /** + * 记录写文件索引 + * + * @param index + */ + public void putWriterIndex(int index) { + mappedByteBuffer.position(24); + mappedByteBuffer.putInt(index); + this.writerIndex = index; + } + + /** + * 记录读取文件索引 + * + * @param index + */ + public void putReaderIndex(int index) { + mappedByteBuffer.position(20); + mappedByteBuffer.putInt(index); + this.readerIndex = index; + } + + public void incrementSize() { + int num = size.incrementAndGet(); + mappedByteBuffer.position(28); + mappedByteBuffer.putInt(num); + } + + public void decrementSize() { + int num = size.decrementAndGet(); + mappedByteBuffer.position(28); + mappedByteBuffer.putInt(num); + } + + public String getMagicString() { + return magicString; + } + + public int getVersion() { + return version; + } + + public int getReaderPosition() { + return readerPosition; + } + + public int getWriterPosition() { + return writerPosition; + } + + public int getReaderIndex() { + return readerIndex; + } + + public int getWriterIndex() { + return writerIndex; + } + + public int getSize() { + return size.get(); + } + + /** + * 关闭索引文件 + */ + public void close() throws IOException { + mappedByteBuffer.force(); mappedByteBuffer.clear(); fc.close(); dbRandFile.close(); mappedByteBuffer = null; fc = null; dbRandFile = null; - } - - public String headerInfo() { - StringBuilder sb = new StringBuilder(); - sb.append(" magicString:"); - sb.append(magicString); - sb.append(" version:"); - sb.append(version); - sb.append(" readerPosition:"); - sb.append(readerPosition); - sb.append(" writerPosition:"); - sb.append(writerPosition); - sb.append(" size:"); - sb.append(size); - sb.append(" readerIndex:"); - sb.append(readerIndex); - sb.append(" writerIndex:"); - sb.append(writerIndex); - return sb.toString(); - } + } + + public String headerInfo() { + StringBuilder sb = new StringBuilder(); + sb.append(" magicString:"); + sb.append(magicString); + sb.append(" version:"); + sb.append(version); + sb.append(" readerPosition:"); + sb.append(readerPosition); + sb.append(" writerPosition:"); + sb.append(writerPosition); + sb.append(" size:"); + sb.append(size); + sb.append(" readerIndex:"); + sb.append(readerIndex); + sb.append(" writerIndex:"); + sb.append(writerIndex); + return sb.toString(); + } } diff --git a/src/test/java/net/apexes/fqueue/demo/FQueueDemo2.java b/src/test/java/net/apexes/fqueue/demo/FQueueDemo2.java new file mode 100644 index 0000000..8748b55 --- /dev/null +++ b/src/test/java/net/apexes/fqueue/demo/FQueueDemo2.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2018, apexes.net. All rights reserved. + * + * http://www.apexes.net + * + */ +package net.apexes.fqueue.demo; + +import net.apexes.fqueue.FQueue; + +/** + * @author HeDYn + */ +public class FQueueDemo2 { + + public static void main(String[] args) throws Exception { + FQueue queue = new FQueue(".temp/test-fqueue"); + + queue.offer("abc".getBytes()); + System.out.println("label1: " + queue.size()); + + String data2 = new String(queue.peek()); + System.out.println("label2: " + queue.size()); + System.out.println("label2: data2 -> " + data2); + + String data3 = new String(queue.remove()); + System.out.println("label3: " + queue.size()); + System.out.println("label3: data3 -> " + data3); + } +} -- Gitee From cf665ae6c97877feb14a39f144ab87bdce250291 Mon Sep 17 00:00:00 2001 From: hedyn Date: Mon, 16 Jul 2018 20:21:29 +0800 Subject: [PATCH 04/13] =?UTF-8?q?=E4=BF=AE=E5=A4=8D#IG5AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 358a468..ba3274b 100644 --- a/pom.xml +++ b/pom.xml @@ -60,6 +60,9 @@ 2.10.4 2.19.1 1.6 + 2.5.2 + 2.7 + 2.5.3 @@ -92,6 +95,16 @@ release + + + org.apache.maven.plugins + maven-release-plugin + ${version.maven-release-plugin} + + false + true + + org.apache.maven.plugins @@ -112,12 +125,11 @@ maven-javadoc-plugin ${version.maven-javadoc-plugin} - UTF-8 -Xdoclint:none - package + attach-javadocs jar @@ -141,9 +153,13 @@ - + + + apexes + http://apexes.net:8125/repository/maven-snapshots/ oss -- Gitee From 43433fdd3a30bef3474dd7a159666584ac10887d Mon Sep 17 00:00:00 2001 From: hedyn Date: Mon, 16 Jul 2018 20:22:56 +0800 Subject: [PATCH 05/13] =?UTF-8?q?=E4=BF=AE=E5=A4=8D#IG5AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index ba3274b..5eae4c1 100644 --- a/pom.xml +++ b/pom.xml @@ -1,9 +1,8 @@ - + 4.0.0 net.apexes.fqueue fqueue - 1.1.0 + 1.1.1-SNAPSHOT fqueue FQueue http://git.oschina.net/apexes/fqueue -- Gitee From 6af319d01c3ca20b26ae79b9f41a8ad73fc23088 Mon Sep 17 00:00:00 2001 From: hedyn Date: Mon, 16 Jul 2018 20:24:23 +0800 Subject: [PATCH 06/13] =?UTF-8?q?=E4=BF=AE=E5=A4=8D#IG5AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pom.xml b/pom.xml index 5eae4c1..3133688 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,8 @@ 1.7 1.7 + UTF-8 + UTF-8 3.6.1 -- Gitee From 8007a39dfceb4480b811e3b0a0ffd5470565e9ba Mon Sep 17 00:00:00 2001 From: hedyn Date: Mon, 16 Jul 2018 20:25:19 +0800 Subject: [PATCH 07/13] [maven-release-plugin] prepare release fqueue-1.1.1 --- pom.xml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 3133688..d8951f5 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 net.apexes.fqueue fqueue - 1.1.1-SNAPSHOT + 1.1.1 fqueue FQueue http://git.oschina.net/apexes/fqueue @@ -25,7 +25,8 @@ scm:git:git@git.oschina.net:apexes/fqueue.git scm:git:git@git.oschina.net:apexes/fqueue.git git@git.oschina.net:apexes/fqueue.git - + fqueue-1.1.1 + -- Gitee From 1e9cc25c5db7e27f09d222e2510c2f255785978e Mon Sep 17 00:00:00 2001 From: hedyn Date: Mon, 16 Jul 2018 20:26:14 +0800 Subject: [PATCH 08/13] [maven-release-plugin] rollback the release of fqueue-1.1.1 --- pom.xml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index d8951f5..3133688 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 net.apexes.fqueue fqueue - 1.1.1 + 1.1.1-SNAPSHOT fqueue FQueue http://git.oschina.net/apexes/fqueue @@ -25,8 +25,7 @@ scm:git:git@git.oschina.net:apexes/fqueue.git scm:git:git@git.oschina.net:apexes/fqueue.git git@git.oschina.net:apexes/fqueue.git - fqueue-1.1.1 - + -- Gitee From 42ed887edd24be3d6d68316799cf34990e889060 Mon Sep 17 00:00:00 2001 From: hedyn Date: Mon, 16 Jul 2018 20:27:12 +0800 Subject: [PATCH 09/13] =?UTF-8?q?=E4=BF=AE=E5=A4=8D#IG5AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index 3133688..f97d24a 100644 --- a/pom.xml +++ b/pom.xml @@ -25,6 +25,7 @@ scm:git:git@git.oschina.net:apexes/fqueue.git scm:git:git@git.oschina.net:apexes/fqueue.git git@git.oschina.net:apexes/fqueue.git + HEAD -- Gitee From c1268f4959d4331bfcd89dcd14b259ba731dfe49 Mon Sep 17 00:00:00 2001 From: hedyn Date: Mon, 16 Jul 2018 20:27:52 +0800 Subject: [PATCH 10/13] [maven-release-plugin] prepare release fqueue-1.1.1 --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index f97d24a..147fff6 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 net.apexes.fqueue fqueue - 1.1.1-SNAPSHOT + 1.1.1 fqueue FQueue http://git.oschina.net/apexes/fqueue @@ -25,7 +25,7 @@ scm:git:git@git.oschina.net:apexes/fqueue.git scm:git:git@git.oschina.net:apexes/fqueue.git git@git.oschina.net:apexes/fqueue.git - HEAD + fqueue-1.1.1 -- Gitee From de01f07a34f4d1bd3f51a8c3ef272fa57956cacd Mon Sep 17 00:00:00 2001 From: hedyn Date: Mon, 16 Jul 2018 20:29:14 +0800 Subject: [PATCH 11/13] [maven-release-plugin] rollback the release of fqueue-1.1.1 --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 147fff6..f97d24a 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 net.apexes.fqueue fqueue - 1.1.1 + 1.1.1-SNAPSHOT fqueue FQueue http://git.oschina.net/apexes/fqueue @@ -25,7 +25,7 @@ scm:git:git@git.oschina.net:apexes/fqueue.git scm:git:git@git.oschina.net:apexes/fqueue.git git@git.oschina.net:apexes/fqueue.git - fqueue-1.1.1 + HEAD -- Gitee From 193ba99b96be1f2119503b6c094cb2b1a6f351d1 Mon Sep 17 00:00:00 2001 From: hedyn Date: Mon, 16 Jul 2018 20:30:30 +0800 Subject: [PATCH 12/13] 1.1.1 --- pom.xml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index f97d24a..56c3717 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 1.1.1-SNAPSHOT fqueue FQueue - http://git.oschina.net/apexes/fqueue + https://github.com/hedyn/fqueue @@ -22,9 +22,9 @@ - scm:git:git@git.oschina.net:apexes/fqueue.git - scm:git:git@git.oschina.net:apexes/fqueue.git - git@git.oschina.net:apexes/fqueue.git + https://gitee.com/apexes/fqueue.git + scm:git:https://gitee.com/apexes/fqueue.git + scm:git:https://gitee.com/apexes/fqueue.git HEAD -- Gitee From 478a5945b80ceedcb0e21a9ca68ea370fe55bf10 Mon Sep 17 00:00:00 2001 From: hedyn Date: Mon, 16 Jul 2018 20:31:10 +0800 Subject: [PATCH 13/13] [maven-release-plugin] prepare release fqueue-1.1.1 --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 56c3717..33fc456 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 net.apexes.fqueue fqueue - 1.1.1-SNAPSHOT + 1.1.1 fqueue FQueue https://github.com/hedyn/fqueue @@ -25,7 +25,7 @@ https://gitee.com/apexes/fqueue.git scm:git:https://gitee.com/apexes/fqueue.git scm:git:https://gitee.com/apexes/fqueue.git - HEAD + fqueue-1.1.1 -- Gitee