diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..7f1f8e48ffe28b098da5aeb37044df5430189cc6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +.* +/target/ +/**/.* +/*.iml +/**/*.iml + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* +!/.gitignore diff --git a/pom.xml b/pom.xml index b4cf4519cd57734f1d09208c275163e0d5feda4e..33fc456965b8c2e8b87f73c75ade59f2ef88669c 100644 --- a/pom.xml +++ b/pom.xml @@ -1,17 +1,11 @@ - + 4.0.0 - - net.apexes - apexes-root - 0.0.1 - net.apexes.fqueue fqueue - 1.1.0 + 1.1.1 fqueue FQueue - http://git.oschina.net/apexes/fqueue + https://github.com/hedyn/fqueue @@ -28,8 +22,152 @@ - scm:git:git@git.oschina.net:apexes/fqueue.git - scm:git:git@git.oschina.net:apexes/fqueue.git - git@git.oschina.net:apexes/fqueue.git + https://gitee.com/apexes/fqueue.git + scm:git:https://gitee.com/apexes/fqueue.git + scm:git:https://gitee.com/apexes/fqueue.git + fqueue-1.1.1 + + + + alimaven + http://maven.aliyun.com/nexus/content/groups/public + + true + + + false + always + fail + + + + + + alimaven + http://maven.aliyun.com/nexus/content/groups/public + + + + + 1.7 + 1.7 + UTF-8 + UTF-8 + + + 3.6.1 + 3.0.2 + 3.0.1 + 2.10.4 + 2.19.1 + 1.6 + 2.5.2 + 2.7 + 2.5.3 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + ${version.maven-compiler-plugin} + + ${maven.compiler.source} + ${maven.compiler.target} + UTF-8 + + + + + org.apache.maven.plugins + maven-surefire-plugin + ${version.maven-surefire-plugin} + + true + + + + + + + + release + + + + + org.apache.maven.plugins + maven-release-plugin + ${version.maven-release-plugin} + + false + true + + + + + org.apache.maven.plugins + maven-source-plugin + ${version.maven-source-plugin} + + + package + + jar-no-fork + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + ${version.maven-javadoc-plugin} + + -Xdoclint:none + + + + attach-javadocs + + jar + + + + + + + org.apache.maven.plugins + maven-gpg-plugin + ${version.maven-gpg-plugin} + + + install + + sign + + + + + + + + + + apexes + http://apexes.net:8125/repository/maven-snapshots/ + + + oss + https://oss.sonatype.org/service/local/staging/deploy/maven2/ + + + + \ No newline at end of file diff --git a/src/main/java/net/apexes/fqueue/FQueue.java b/src/main/java/net/apexes/fqueue/FQueue.java index a192944cddf05697d15057215e83716437b9f94a..ecef51dbdf60697642b3b69ee5f0dd309245b737 100644 --- a/src/main/java/net/apexes/fqueue/FQueue.java +++ b/src/main/java/net/apexes/fqueue/FQueue.java @@ -12,120 +12,124 @@ import net.apexes.fqueue.exception.FileFormatException; /** * 基于文件系统的持久化队列 - * + * * @author HeDYn * @author sunli */ public class FQueue extends AbstractQueue implements Serializable { - private static final long serialVersionUID = -1L; + private static final long serialVersionUID = -1L; - private FSQueue fsQueue = null; - private Lock lock = new ReentrantReadWriteLock().writeLock(); - - public FQueue(String path) throws IOException, FileFormatException { - fsQueue = new FSQueue(path); - } + private FSQueue fsQueue = null; + private Lock lock = new ReentrantReadWriteLock().writeLock(); - /** - * 创建一个持久化队列 - * @param path 文件的存储路径 - * @param entityLimitLength 存储数据的单个文件的大小 - * @throws IOException - * @throws FileFormatException - */ - public FQueue(String path, int entityLimitLength) throws IOException, FileFormatException { - fsQueue = new FSQueue(path, entityLimitLength); - } - - public FQueue(File dir) throws IOException, FileFormatException { - fsQueue = new FSQueue(dir); - } - - /** + public FQueue(String path) throws IOException, FileFormatException { + fsQueue = new FSQueue(path); + } + + /** + * 创建一个持久化队列 + * + * @param path 文件的存储路径 + * @param entityLimitLength 存储数据的单个文件的大小 + * @throws IOException + * @throws FileFormatException + */ + public FQueue(String path, int entityLimitLength) throws IOException, FileFormatException { + fsQueue = new FSQueue(path, entityLimitLength); + } + + public FQueue(File dir) throws IOException, FileFormatException { + fsQueue = new FSQueue(dir); + } + + /** * 创建一个持久化队列 - * @param path 文件的存储目录 + * + * @param dir 文件的存储目录 * @param entityLimitLength 存储数据的单个文件的大小 * @throws IOException * @throws FileFormatException */ - public FQueue(File dir, int entityLimitLength) throws IOException, FileFormatException { + public FQueue(File dir, int entityLimitLength) throws IOException, FileFormatException { fsQueue = new FSQueue(dir, entityLimitLength); } - @Override - public Iterator iterator() { - throw new UnsupportedOperationException("iterator Unsupported now"); - } + @Override + public Iterator iterator() { + throw new UnsupportedOperationException("iterator Unsupported now"); + } - @Override - public int size() { - return fsQueue.getQueueSize(); - } + @Override + public int size() { + return fsQueue.getQueueSize(); + } - @Override - public boolean offer(byte[] e) { - try { - lock.lock(); - fsQueue.add(e); - return true; - } catch (IOException ex) { - throw new RuntimeException(ex); - } catch (FileFormatException ex) { - } finally { - lock.unlock(); - } - return false; - } + @Override + public boolean offer(byte[] e) { + try { + lock.lock(); + fsQueue.add(e); + return true; + } catch (IOException ex) { + throw new RuntimeException(ex); + } catch (FileFormatException ex) { + } finally { + lock.unlock(); + } + return false; + } - @Override - public byte[] peek() { + @Override + public byte[] peek() { try { - lock.lock(); - return fsQueue.readNext(); - } catch (IOException ex) { - return null; - } catch (FileFormatException ex) { - return null; - } finally { - lock.unlock(); - } - } + lock.lock(); + return fsQueue.readNext(); + } catch (IOException ex) { + return null; + } catch (FileFormatException ex) { + return null; + } finally { + lock.unlock(); + } + } - @Override - public byte[] poll() { - try { - lock.lock(); - return fsQueue.readNextAndRemove(); - } catch (IOException ex) { - return null; - } catch (FileFormatException ex) { - return null; - } finally { - lock.unlock(); - } - } - - @Override + @Override + public byte[] poll() { + try { + lock.lock(); + return fsQueue.readNextAndRemove(); + } catch (IOException ex) { + return null; + } catch (FileFormatException ex) { + return null; + } finally { + lock.unlock(); + } + } + + @Override public void clear() { - try { + try { lock.lock(); fsQueue.clear(); } catch (IOException ex) { throw new RuntimeException(ex); } catch (FileFormatException e) { + // ignore } finally { lock.unlock(); } } /** - * 关闭文件队列 - * @throws IOException - * @throws FileFormatException - */ - public void close() throws IOException, FileFormatException { - if (fsQueue != null) { - fsQueue.close(); - } - } + * 关闭文件队列 + * + * @throws IOException + * @throws FileFormatException + */ + public void close() throws IOException, FileFormatException { + if (fsQueue != null) { + fsQueue.close(); + } + } } diff --git a/src/main/java/net/apexes/fqueue/FSQueue.java b/src/main/java/net/apexes/fqueue/FSQueue.java index 5b647b9f8d2491571ffdf2a84c1d80afca990435..e55aad29228bd87948aaddb50c1b07e78b72aaf4 100644 --- a/src/main/java/net/apexes/fqueue/FSQueue.java +++ b/src/main/java/net/apexes/fqueue/FSQueue.java @@ -15,67 +15,63 @@ import net.apexes.fqueue.internal.Index; * @author sunli */ public class FSQueue { - private int entityLimitLength; - private String path = null; - /** - * 文件操作实例 - */ - private Index idx = null; - private Entity writerHandle = null; - private Entity readerHandle = null; - /** - * 文件操作位置信息 - */ - private int readerIndex = -1; - private int writerIndex = -1; - - public FSQueue(String dir) throws IOException, FileFormatException { - this(new File(dir)); - } - - /** - * 在指定的目录中,以fileLimitLength为单个数据文件的最大大小限制初始化队列存储 - * - * @param dir - * 队列数据存储的路径 - * @param entityLimitLength - * 单个数据文件的大小,不能超过2G - * @throws IOException - * @throws FileFormatException - */ - public FSQueue(String dir, int entityLimitLength) throws IOException, FileFormatException { - this(new File(dir), entityLimitLength); - } - - public FSQueue(File dir) throws IOException, FileFormatException { - this(dir, 1024 * 1024 * 2); - } - - /** + private int entityLimitLength; + private String path = null; + /** + * 文件操作实例 + */ + private Index idx = null; + private Entity writerHandle = null; + private Entity readerHandle = null; + /** + * 文件操作位置信息 + */ + private int readerIndex = -1; + private int writerIndex = -1; + + public FSQueue(String dir) throws IOException, FileFormatException { + this(new File(dir)); + } + + /** * 在指定的目录中,以fileLimitLength为单个数据文件的最大大小限制初始化队列存储 * - * @param dir - * 队列数据存储的目录 - * @param entityLimitLength - * 单个数据文件的大小,不能超过2G + * @param dir 队列数据存储的路径 + * @param entityLimitLength 单个数据文件的大小,不能超过2G * @throws IOException * @throws FileFormatException */ - public FSQueue(File dir, int entityLimitLength) throws IOException, FileFormatException { - if (dir.exists() == false && dir.isDirectory() == false) { + public FSQueue(String dir, int entityLimitLength) throws IOException, FileFormatException { + this(new File(dir), entityLimitLength); + } + + public FSQueue(File dir) throws IOException, FileFormatException { + this(dir, 1024 * 1024 * 2); + } + + /** + * 在指定的目录中,以fileLimitLength为单个数据文件的最大大小限制初始化队列存储 + * + * @param dir 队列数据存储的目录 + * @param entityLimitLength 单个数据文件的大小,不能超过2G + * @throws IOException + * @throws FileFormatException + */ + public FSQueue(File dir, int entityLimitLength) throws IOException, FileFormatException { + if (dir.exists() == false && dir.isDirectory() == false) { if (dir.mkdirs() == false) { throw new IOException("create dir error"); } } - this.entityLimitLength = entityLimitLength; + this.entityLimitLength = entityLimitLength; path = dir.getAbsolutePath(); // 打开索引文件 idx = new Index(path); initHandle(); - } - - private void initHandle() throws IOException, FileFormatException { - writerIndex = idx.getWriterIndex(); + } + + private void initHandle() throws IOException, FileFormatException { + writerIndex = idx.getWriterIndex(); readerIndex = idx.getReaderIndex(); writerHandle = new Entity(path, writerIndex, entityLimitLength, idx); if (readerIndex == writerIndex) { @@ -83,82 +79,81 @@ public class FSQueue { } else { readerHandle = new Entity(path, readerIndex, entityLimitLength, idx); } - } - - /** - * 一个文件的数据写入达到fileLimitLength的时候,滚动到下一个文件实例 - * - * @throws IOException - * @throws FileFormatException - */ - private void rotateNextLogWriter() throws IOException, FileFormatException { - writerIndex = writerIndex + 1; - writerHandle.putNextFileNumber(writerIndex); - if (readerHandle != writerHandle) { - writerHandle.close(); - } - idx.putWriterIndex(writerIndex); - writerHandle = new Entity(path, writerIndex, entityLimitLength, idx, true); - } - - /** - * 向队列存储添加一个字符串 - * - * @param message - * message - * @throws IOException - * @throws FileFormatException - */ - public void add(String message) throws IOException, FileFormatException { - add(message.getBytes()); - } - - /** - * 向队列存储添加一个byte数组 - * - * @param message - * @throws IOException - * @throws FileFormatException - */ - public void add(byte[] message) throws IOException, FileFormatException { - short status = writerHandle.write(message); - if (status == Entity.WRITEFULL) { - rotateNextLogWriter(); - status = writerHandle.write(message); - } - if (status == Entity.WRITESUCCESS) { - idx.incrementSize(); - } - } + } + + /** + * 一个文件的数据写入达到fileLimitLength的时候,滚动到下一个文件实例 + * + * @throws IOException + * @throws FileFormatException + */ + private void rotateNextLogWriter() throws IOException, FileFormatException { + writerIndex = writerIndex + 1; + writerHandle.putNextFileNumber(writerIndex); + if (readerHandle != writerHandle) { + writerHandle.close(); + } + idx.putWriterIndex(writerIndex); + writerHandle = new Entity(path, writerIndex, entityLimitLength, idx, true); + } + + /** + * 向队列存储添加一个字符串 + * + * @param message message + * @throws IOException + * @throws FileFormatException + */ + public void add(String message) throws IOException, FileFormatException { + add(message.getBytes()); + } + + /** + * 向队列存储添加一个byte数组 + * + * @param message + * @throws IOException + * @throws FileFormatException + */ + public void add(byte[] message) throws IOException, FileFormatException { + short status = writerHandle.write(message); + if (status == Entity.WRITEFULL) { + rotateNextLogWriter(); + status = writerHandle.write(message); + } + if (status == Entity.WRITESUCCESS) { + idx.incrementSize(); + } + } private byte[] read(boolean commit) throws IOException, FileFormatException { - byte[] bytes = null; - try { - bytes = readerHandle.read(commit); - } catch (FileEOFException e) { - int nextFileNumber = readerHandle.getNextFileNumber(); + byte[] bytes; + try { + bytes = readerHandle.read(commit); + } catch (FileEOFException e) { + int nextFileNumber = readerHandle.getNextFileNumber(); readerHandle.reset(); File deleteFile = readerHandle.getFile(); - readerHandle.close(); + readerHandle.close(); deleteFile.delete(); // 更新下一次读取的位置和索引 - idx.putReaderPosition(Entity.MESSAGE_START_POSITION); - idx.putReaderIndex(nextFileNumber); - if (writerHandle.getCurrentFileNumber() == nextFileNumber) { - readerHandle = writerHandle; - } else { - readerHandle = new Entity(path, nextFileNumber, entityLimitLength, idx); - } - try { - bytes = readerHandle.read(commit); - } catch (FileEOFException e1) { - throw new FileFormatException(e1); - } - } - if (bytes != null) { - idx.decrementSize(); - } - return bytes; + idx.putReaderPosition(Entity.MESSAGE_START_POSITION); + idx.putReaderIndex(nextFileNumber); + if (writerHandle.getCurrentFileNumber() == nextFileNumber) { + readerHandle = writerHandle; + } else { + readerHandle = new Entity(path, nextFileNumber, entityLimitLength, idx); + } + try { + bytes = readerHandle.read(commit); + } catch (FileEOFException e1) { + throw new FileFormatException(e1); + } + } + if (commit && bytes != null) { + idx.decrementSize(); + } + return bytes; } /** @@ -172,28 +167,29 @@ public class FSQueue { return read(false); } - /** - * 从队列存储中取出最先入队的数据,并移除它 - * @return - * @throws IOException - * @throws FileFormatException - */ - public byte[] readNextAndRemove() throws IOException, FileFormatException { + /** + * 从队列存储中取出最先入队的数据,并移除它 + * + * @return + * @throws IOException + * @throws FileFormatException + */ + public byte[] readNextAndRemove() throws IOException, FileFormatException { return read(true); - } - - public void clear() throws IOException, FileFormatException { - idx.clear(); - initHandle(); - } - - public void close() throws IOException { - readerHandle.close(); - writerHandle.close(); + } + + public void clear() throws IOException, FileFormatException { + idx.clear(); + initHandle(); + } + + public void close() throws IOException { + readerHandle.close(); + writerHandle.close(); idx.close(); - } + } - public int getQueueSize() { - return idx.getSize(); - } + public int getQueueSize() { + return idx.getSize(); + } } diff --git a/src/main/java/net/apexes/fqueue/exception/FileEOFException.java b/src/main/java/net/apexes/fqueue/exception/FileEOFException.java index 1169dc3fd65e355d15f4a5dc9cb35930eb898864..d49ff8b4224748c16754fbeca944a495e5136207 100644 --- a/src/main/java/net/apexes/fqueue/exception/FileEOFException.java +++ b/src/main/java/net/apexes/fqueue/exception/FileEOFException.java @@ -1,30 +1,31 @@ package net.apexes.fqueue.exception; /** - * * @author sunli */ public class FileEOFException extends Exception { - private static final long serialVersionUID = -1L; - - public FileEOFException() { - super(); - } - public FileEOFException(String message) { - super(message); - } - - public FileEOFException(String message, Throwable cause) { - super(message, cause); - } - - public FileEOFException(Throwable cause) { - super(cause); - } - @Override - public Throwable fillInStackTrace() { - return this; - } + private static final long serialVersionUID = -1L; + + public FileEOFException() { + super(); + } + + public FileEOFException(String message) { + super(message); + } + + public FileEOFException(String message, Throwable cause) { + super(message, cause); + } + + public FileEOFException(Throwable cause) { + super(cause); + } + + @Override + public Throwable fillInStackTrace() { + return this; + } } diff --git a/src/main/java/net/apexes/fqueue/exception/FileFormatException.java b/src/main/java/net/apexes/fqueue/exception/FileFormatException.java index 149424911d94d20611002a956b4ad39d4790139b..6bf40094d2b2d7f13486f9d1f2be496065e9352a 100644 --- a/src/main/java/net/apexes/fqueue/exception/FileFormatException.java +++ b/src/main/java/net/apexes/fqueue/exception/FileFormatException.java @@ -1,33 +1,32 @@ package net.apexes.fqueue.exception; /** - * * @author sunli */ public class FileFormatException extends Exception { - /** - * - */ - private static final long serialVersionUID = -1L; + /** + * + */ + private static final long serialVersionUID = -1L; - /** - * Constructs an {@code FileFormatException} with {@code null} as its error - * detail message. - */ - public FileFormatException() { - super(); - } + /** + * Constructs an {@code FileFormatException} with {@code null} as its error + * detail message. + */ + public FileFormatException() { + super(); + } - public FileFormatException(String message) { - super(message); - } + public FileFormatException(String message) { + super(message); + } - public FileFormatException(String message, Throwable cause) { - super(message, cause); - } + public FileFormatException(String message, Throwable cause) { + super(message, cause); + } - public FileFormatException(Throwable cause) { - super(cause); - } + public FileFormatException(Throwable cause) { + super(cause); + } } diff --git a/src/main/java/net/apexes/fqueue/internal/Entity.java b/src/main/java/net/apexes/fqueue/internal/Entity.java index 5942c30dc2fe69f86e7664a27799c48e24b4c3c6..175330670cdafcbe266036acbfca7b0881dce585 100644 --- a/src/main/java/net/apexes/fqueue/internal/Entity.java +++ b/src/main/java/net/apexes/fqueue/internal/Entity.java @@ -47,7 +47,7 @@ public class Entity { } public Entity(String path, int fileNumber, int fileLimitLength, - Index idx, boolean create) throws IOException, FileFormatException { + Index idx, boolean create) throws IOException, FileFormatException { this.currentFileNumber = fileNumber; this.fileLimitLength = fileLimitLength; this.idx = idx; @@ -57,7 +57,7 @@ public class Entity { createLogEntity(); } else { raFile = new RandomAccessFile(file, "rwd"); - int fileLength = (int)raFile.length(); + int fileLength = (int) raFile.length(); if (fileLength < MESSAGE_START_POSITION) { throw new FileFormatException("file format error"); } @@ -172,7 +172,6 @@ public class Entity { } /** - * * @param commit 如果为 false,则只读取数据,不移动读取指针。 * @return * @throws IOException @@ -238,24 +237,24 @@ public class Entity { } private class Sync implements Runnable { - @Override - public void run() { - while (true) { - if (mappedByteBuffer != null) { - try { - mappedByteBuffer.force(); - } catch (Exception e) { - break; - } - try { - Thread.sleep(10); - } catch (InterruptedException e) { - break; - } - } else { - break; - } - } - } - } + @Override + public void run() { + while (true) { + if (mappedByteBuffer != null) { + try { + mappedByteBuffer.force(); + } catch (Exception e) { + break; + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + break; + } + } else { + break; + } + } + } + } } diff --git a/src/main/java/net/apexes/fqueue/internal/Index.java b/src/main/java/net/apexes/fqueue/internal/Index.java index c25130b2312f8ff330040a2ee5dfbfaae7be2ad6..cbd7aa90dc35c8190686f271d472f68436944b5f 100644 --- a/src/main/java/net/apexes/fqueue/internal/Index.java +++ b/src/main/java/net/apexes/fqueue/internal/Index.java @@ -18,197 +18,197 @@ import net.apexes.fqueue.exception.FileFormatException; * @author sunli */ public class Index { - private static final int INDEX_LIMIT_LENGTH = 32; + private static final int INDEX_LIMIT_LENGTH = 32; private static final String INDEX_FILE_NAME = "fq.idx"; - private RandomAccessFile dbRandFile = null; - private FileChannel fc; - private MappedByteBuffer mappedByteBuffer; - - /** - * 文件操作位置信息 - */ - private String magicString = null; - private int version = -1; - private int readerPosition = -1; - private int writerPosition = -1; - private int readerIndex = -1; - private int writerIndex = -1; - private AtomicInteger size = new AtomicInteger(); - - public Index(String path) throws IOException, FileFormatException { - File dbFile = new File(path, INDEX_FILE_NAME); - - // 文件不存在,创建文件 - if (dbFile.exists() == false) { - dbFile.createNewFile(); - dbRandFile = new RandomAccessFile(dbFile, "rwd"); - initIdxFile(); - } else { - dbRandFile = new RandomAccessFile(dbFile, "rwd"); - if (dbRandFile.length() < INDEX_LIMIT_LENGTH) { - throw new FileFormatException("file format error."); - } - byte[] bytes = new byte[INDEX_LIMIT_LENGTH]; - dbRandFile.read(bytes); + private RandomAccessFile dbRandFile; + private FileChannel fc; + private MappedByteBuffer mappedByteBuffer; + + /** + * 文件操作位置信息 + */ + private String magicString = null; + private int version = -1; + private int readerPosition = -1; + private int writerPosition = -1; + private int readerIndex = -1; + private int writerIndex = -1; + private AtomicInteger size = new AtomicInteger(); + + public Index(String path) throws IOException, FileFormatException { + File dbFile = new File(path, INDEX_FILE_NAME); + + // 文件不存在,创建文件 + if (dbFile.exists() == false) { + dbFile.createNewFile(); + dbRandFile = new RandomAccessFile(dbFile, "rwd"); + initIdxFile(); + } else { + dbRandFile = new RandomAccessFile(dbFile, "rwd"); + if (dbRandFile.length() < INDEX_LIMIT_LENGTH) { + throw new FileFormatException("file format error."); + } + byte[] bytes = new byte[INDEX_LIMIT_LENGTH]; + dbRandFile.read(bytes); ByteBuffer buffer = ByteBuffer.wrap(bytes); - bytes = new byte[Entity.MAGIC.getBytes().length]; - buffer.get(bytes); - magicString = new String(bytes); - version = buffer.getInt(); - readerPosition = buffer.getInt(); - writerPosition = buffer.getInt(); - readerIndex = buffer.getInt(); - writerIndex = buffer.getInt(); + bytes = new byte[Entity.MAGIC.getBytes().length]; + buffer.get(bytes); + magicString = new String(bytes); + version = buffer.getInt(); + readerPosition = buffer.getInt(); + writerPosition = buffer.getInt(); + readerIndex = buffer.getInt(); + writerIndex = buffer.getInt(); int sz = buffer.getInt(); if (readerPosition == writerPosition && readerIndex == writerIndex && sz <= 0) { - initIdxFile(); + initIdxFile(); } else { size.set(sz); } - } + } fc = dbRandFile.getChannel(); mappedByteBuffer = fc.map(MapMode.READ_WRITE, 0, INDEX_LIMIT_LENGTH); - } + } private void initIdxFile() throws IOException { - magicString = Entity.MAGIC; - version = 1; - readerPosition = Entity.MESSAGE_START_POSITION; - writerPosition = Entity.MESSAGE_START_POSITION; - readerIndex = 1; - writerIndex = 1; + magicString = Entity.MAGIC; + version = 1; + readerPosition = Entity.MESSAGE_START_POSITION; + writerPosition = Entity.MESSAGE_START_POSITION; + readerIndex = 1; + writerIndex = 1; dbRandFile.setLength(32); dbRandFile.seek(0); dbRandFile.write(magicString.getBytes());// magic dbRandFile.writeInt(version);// 8 version - dbRandFile.writeInt(readerPosition);// 12 reader position - dbRandFile.writeInt(writerPosition);// 16 write position - dbRandFile.writeInt(readerIndex);// 20 reader index - dbRandFile.writeInt(writerIndex);// 24 writer index - dbRandFile.writeInt(0);// 28 size + dbRandFile.writeInt(readerPosition);// 12 reader position + dbRandFile.writeInt(writerPosition);// 16 write position + dbRandFile.writeInt(readerIndex);// 20 reader index + dbRandFile.writeInt(writerIndex);// 24 writer index + dbRandFile.writeInt(0);// 28 size } - + public void clear() throws IOException { mappedByteBuffer.clear(); mappedByteBuffer.force(); initIdxFile(); } - /** - * 记录写位置 - * - * @param pos - */ - public void putWriterPosition(int pos) { - mappedByteBuffer.position(16); - mappedByteBuffer.putInt(pos); - this.writerPosition = pos; - } - - /** - * 记录读取的位置 - * - * @param pos - */ - public void putReaderPosition(int pos) { - mappedByteBuffer.position(12); - mappedByteBuffer.putInt(pos); - this.readerPosition = pos; - } - - /** - * 记录写文件索引 - * - * @param index - */ - public void putWriterIndex(int index) { - mappedByteBuffer.position(24); - mappedByteBuffer.putInt(index); - this.writerIndex = index; - } - - /** - * 记录读取文件索引 - * - * @param index - */ - public void putReaderIndex(int index) { - mappedByteBuffer.position(20); - mappedByteBuffer.putInt(index); - this.readerIndex = index; - } - - public void incrementSize() { - int num = size.incrementAndGet(); - mappedByteBuffer.position(28); - mappedByteBuffer.putInt(num); - } - - public void decrementSize() { - int num = size.decrementAndGet(); - mappedByteBuffer.position(28); - mappedByteBuffer.putInt(num); - } - - public String getMagicString() { - return magicString; - } - - public int getVersion() { - return version; - } - - public int getReaderPosition() { - return readerPosition; - } - - public int getWriterPosition() { - return writerPosition; - } - - public int getReaderIndex() { - return readerIndex; - } - - public int getWriterIndex() { - return writerIndex; - } - - public int getSize() { - return size.get(); - } - - /** - * 关闭索引文件 - */ - public void close() throws IOException { - mappedByteBuffer.force(); + /** + * 记录写位置 + * + * @param pos + */ + public void putWriterPosition(int pos) { + mappedByteBuffer.position(16); + mappedByteBuffer.putInt(pos); + this.writerPosition = pos; + } + + /** + * 记录读取的位置 + * + * @param pos + */ + public void putReaderPosition(int pos) { + mappedByteBuffer.position(12); + mappedByteBuffer.putInt(pos); + this.readerPosition = pos; + } + + /** + * 记录写文件索引 + * + * @param index + */ + public void putWriterIndex(int index) { + mappedByteBuffer.position(24); + mappedByteBuffer.putInt(index); + this.writerIndex = index; + } + + /** + * 记录读取文件索引 + * + * @param index + */ + public void putReaderIndex(int index) { + mappedByteBuffer.position(20); + mappedByteBuffer.putInt(index); + this.readerIndex = index; + } + + public void incrementSize() { + int num = size.incrementAndGet(); + mappedByteBuffer.position(28); + mappedByteBuffer.putInt(num); + } + + public void decrementSize() { + int num = size.decrementAndGet(); + mappedByteBuffer.position(28); + mappedByteBuffer.putInt(num); + } + + public String getMagicString() { + return magicString; + } + + public int getVersion() { + return version; + } + + public int getReaderPosition() { + return readerPosition; + } + + public int getWriterPosition() { + return writerPosition; + } + + public int getReaderIndex() { + return readerIndex; + } + + public int getWriterIndex() { + return writerIndex; + } + + public int getSize() { + return size.get(); + } + + /** + * 关闭索引文件 + */ + public void close() throws IOException { + mappedByteBuffer.force(); mappedByteBuffer.clear(); fc.close(); dbRandFile.close(); mappedByteBuffer = null; fc = null; dbRandFile = null; - } - - public String headerInfo() { - StringBuilder sb = new StringBuilder(); - sb.append(" magicString:"); - sb.append(magicString); - sb.append(" version:"); - sb.append(version); - sb.append(" readerPosition:"); - sb.append(readerPosition); - sb.append(" writerPosition:"); - sb.append(writerPosition); - sb.append(" size:"); - sb.append(size); - sb.append(" readerIndex:"); - sb.append(readerIndex); - sb.append(" writerIndex:"); - sb.append(writerIndex); - return sb.toString(); - } + } + + public String headerInfo() { + StringBuilder sb = new StringBuilder(); + sb.append(" magicString:"); + sb.append(magicString); + sb.append(" version:"); + sb.append(version); + sb.append(" readerPosition:"); + sb.append(readerPosition); + sb.append(" writerPosition:"); + sb.append(writerPosition); + sb.append(" size:"); + sb.append(size); + sb.append(" readerIndex:"); + sb.append(readerIndex); + sb.append(" writerIndex:"); + sb.append(writerIndex); + return sb.toString(); + } } diff --git a/src/test/java/net/apexes/fqueue/demo/FQueueDemo2.java b/src/test/java/net/apexes/fqueue/demo/FQueueDemo2.java new file mode 100644 index 0000000000000000000000000000000000000000..8748b554047e1aed01c6ef4c33144cb554f15649 --- /dev/null +++ b/src/test/java/net/apexes/fqueue/demo/FQueueDemo2.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2018, apexes.net. All rights reserved. + * + * http://www.apexes.net + * + */ +package net.apexes.fqueue.demo; + +import net.apexes.fqueue.FQueue; + +/** + * @author HeDYn + */ +public class FQueueDemo2 { + + public static void main(String[] args) throws Exception { + FQueue queue = new FQueue(".temp/test-fqueue"); + + queue.offer("abc".getBytes()); + System.out.println("label1: " + queue.size()); + + String data2 = new String(queue.peek()); + System.out.println("label2: " + queue.size()); + System.out.println("label2: data2 -> " + data2); + + String data3 = new String(queue.remove()); + System.out.println("label3: " + queue.size()); + System.out.println("label3: data3 -> " + data3); + } +}