Changeset 594 for Tests


Ignore:
Timestamp:
12/10/13 17:39:29 (8 years ago)
Author:
BegemoT
Message:
 
Location:
Tests/JAVA/logger
Files:
18 added
1 deleted
13 edited
2 copied
4 moved

Legend:

Unmodified
Added
Removed
  • Tests/JAVA/logger/scripts/batch.sh

    r593 r594  
    33for PAYLOAD in 0 20 40 60 80 120 200 
    44do 
    5         java -server -Xmx512M -da -Dwriter-backoff=$PAYLOAD -Dcells-per-record=32 -jar target/benchmarks.jar ".*$1.*" -f -w 2 -wi 5 -i 5 -r 5 -gc true >> "data/$1.results" 
     5        java -server -Xmx512M -da -Dwriter-backoff=$PAYLOAD -Dcells-per-record=8 -jar target/benchmarks.jar ".*$1.*" -f -w 2 -wi 5 -i 5 -r 10 -gc true >> "data/$1.results" 
    66done 
     7 
     8#for CELLS in 4 8 16 32 63 
     9#do 
     10#       java -server -Xmx512M -da -Dwriter-backoff=80 -Dcells-per-record=$CELLS -jar target/benchmarks.jar ".*$1.*" -f -w 2 -wi 5 -i 5 -r 10 -gc true >> "data/$1.cells.results" 
     11#done 
  • Tests/JAVA/logger/src/main/java/com/db/logger/api/impl/PlaygroundMain.java

    r589 r594  
    55import java.util.concurrent.TimeUnit; 
    66 
    7 import com.db.logger.api.impl.logger.CircularBuffer; 
    8 import com.db.logger.api.impl.logger.DemultiplexingSequencer; 
     7import com.db.logger.api.impl.logger.RingBuffer; 
     8import com.db.logger.api.impl.logger.MCSDSequencer; 
    99import com.db.logger.api.impl.logger.formatters.SimpleLogMessage; 
    1010import com.db.logger.api.impl.logger.buffer.PlainLongsBuffer; 
     
    3232                final int length = ( 1 << 14 ); 
    3333                final PlainLongsBuffer buffer = new PlainLongsBuffer( length ); 
    34                 final DemultiplexingSequencer sequencer = new DemultiplexingSequencer( length ); 
     34                final MCSDSequencer sequencer = new MCSDSequencer( length ); 
    3535 
    3636                final int workers = Runtime.getRuntime().availableProcessors() - 1; 
     
    4141                        public void run() { 
    4242                                for(; ; ) { 
    43                                         sequencer.drainTo( new DemultiplexingSequencer.Drainer() { 
     43                                        sequencer.drainTo( new MCSDSequencer.Drainer() { 
    4444                                                @Override 
    4545                                                public int available( final long startSequence, 
     
    5858                                        formatId, 
    5959                                        2, 
    60                                         new CircularBuffer( sequencer, buffer, SPINNING ) 
     60                                        new RingBuffer( sequencer, buffer, SPINNING ) 
    6161                        ); 
    6262                        workersPool.submit( 
  • Tests/JAVA/logger/src/main/java/com/db/logger/api/impl/logger/FastLoggerImpl.java

    r590 r594  
    99import com.db.logger.api.LogMessage; 
    1010import com.db.logger.api.impl.logger.buffer.ILongsBuffer; 
    11 import com.db.logger.api.impl.logger.formatters.FlyweightLogMessage; 
    1211import com.db.logger.api.impl.logger.formatters.RawLogMessage; 
    1312import com.db.logger.api.impl.logger.formatters.SimpleLogMessage; 
     
    3130        private final ThreadFactory threadFactory; 
    3231 
    33         private final CircularBuffer circularBuffer; 
     32        private final RingBuffer ringBuffer; 
    3433 
    3534        private final RawWriter writer; 
     
    4948 
    5049                final int length = buffer.length(); 
    51                 this.circularBuffer = new CircularBuffer( 
    52                                 new DemultiplexingSequencer( length ), 
     50                this.ringBuffer = new RingBuffer( 
     51                                new MCSDSequencer( length ), 
    5352                                buffer, 
    5453                                waitingStrategy 
     
    7271                                messageInfo.formatId, 
    7372                                messageInfo.argumentsCount, 
    74                                 circularBuffer 
     73                                ringBuffer 
    7574                ); 
    7675        } 
     
    7978                @Override 
    8079                protected RawLogMessage initialValue() { 
    81                         return new RawLogMessage( circularBuffer ); 
     80                        return new RawLogMessage( ringBuffer ); 
    8281                } 
    8382        }; 
     
    9190        } 
    9291 
    93         private final THashMap<String, MessageInfo> formats = new THashMap<String, MessageInfo>( 128 ); 
     92        private final MessagesCatalog messages = new MessagesCatalog( 2048 ); 
     93 
     94        public MessageInfo lookupMessageInfo( final String messageFormat ) { 
     95                return messages.lookupMessageInfo( messageFormat ); 
     96        } 
    9497 
    9598        public LogMessage message( final String messageFormat ) { 
     
    99102        } 
    100103 
    101         public MessageInfo lookupMessageInfo( final String messageFormat ) { 
    102                 //TODO RC: dcl, ok for testing, but needs rewrite 
    103                 MessageInfo messageInfo = formats.get( messageFormat ); 
    104                 if( messageInfo == null ) { 
    105                         synchronized( formats ) { 
    106                                 messageInfo = formats.get( messageFormat ); 
    107                                 if( messageInfo == null ) { 
    108                                         checkArgument( messageFormat != null, "messageFormat can't be null" ); 
    109                                         messageInfo = new MessageInfo( 
    110                                                         messageFormat, 
    111                                                         calculateArgumentsCount( messageFormat ), 
    112                                                         formats.size() + 1 
    113                                         ); 
    114                                         formats.put( messageFormat, messageInfo ); 
    115                                 } 
    116                         } 
    117                 } 
    118                 return messageInfo; 
    119         } 
    120  
    121         private static int calculateArgumentsCount( final String formatMessage ) { 
    122                 int argumentsCount = 0; 
    123                 for( int i = 0; i < formatMessage.length(); i++ ) { 
    124                         if( formatMessage.charAt( i ) == '%' ) {//TODO RC: not safe! '%%' is not placeholder 
    125                                 argumentsCount++; 
    126                         } 
    127                 } 
    128                 return argumentsCount; 
    129         } 
    130  
    131104        private Thread drainerThread = null; 
    132105 
     
    134107                if( drainerThread == null ) { 
    135108                        final Drainer drainer = new Drainer( 
    136                                         circularBuffer, new ConsumingDrainer( 
    137                                         circularBuffer.buffer(), 
     109                                        ringBuffer, new ConsumingDrainer( 
     110                                        ringBuffer.buffer(), 
    138111                                        writer 
    139112                        ) 
     
    165138//      } 
    166139 
    167         private static class Drainer implements Runnable, DemultiplexingSequencer.Drainer { 
     140        private static class Drainer implements Runnable, MCSDSequencer.Drainer { 
    168141                //TODO setup! 
    169142                private static final int MIN_PERIOD_MS = 1; 
     
    173146 
    174147                private final ConsumingDrainer consumer; 
    175                 private final CircularBuffer circularBuffer; 
    176  
    177                 private Drainer( final CircularBuffer circularBuffer, 
     148                private final RingBuffer ringBuffer; 
     149 
     150                private Drainer( final RingBuffer ringBuffer, 
    178151                                 final ConsumingDrainer consumer ) { 
    179152                        this.consumer = consumer; 
    180                         this.circularBuffer = circularBuffer; 
     153                        this.ringBuffer = ringBuffer; 
    181154                } 
    182155 
     
    185158                        while( !Thread.interrupted() ) { 
    186159                                try { 
    187                                         circularBuffer.sequencer().drainTo( this ); 
    188  
    189                                         reportingPeriodMs = nextWaitPeriod( 
    190                                                         processedRecords, 
    191                                                         circularBuffer.length(), 
    192                                                         reportingPeriodMs 
    193                                         ); 
     160                                        ringBuffer.drainTo( this ); 
     161                                        Thread.yield(); 
     162//                                      reportingPeriodMs = nextWaitPeriod( 
     163//                                                      processedRecords, 
     164//                                                      ringBuffer.length(), 
     165//                                                      reportingPeriodMs 
     166//                                      ); 
    194167//                                      Thread.sleep( reportingPeriodMs ); 
    195168//                              } catch( InterruptedException e ) { 
     
    231204        } 
    232205 
    233         private static class ConsumingDrainer implements DemultiplexingSequencer.Drainer { 
     206        private static class ConsumingDrainer implements Sequencer.Drainer { 
    234207                private static final int SPINS_PER_TURN = 256; 
    235208 
  • Tests/JAVA/logger/src/main/java/com/db/logger/api/impl/logger/MCSDSequencer.java

    r589 r594  
    11package com.db.logger.api.impl.logger; 
    22 
    3 import java.lang.reflect.Field; 
    4  
    5 import sun.misc.Unsafe; 
    6  
    7 import static com.google.common.base.Preconditions.checkArgument; 
    8 import static com.google.common.base.Preconditions.checkState; 
    93 
    104/** 
     
    1610 * Client code must involve some kind of protocol to ensure that entry filling with 
    1711 * data was finished, since sequencer itself takes responsibility only on spreading 
    18  * indexes to producers. {@link #drainTo(com.db.logger.api.impl.logger.DemultiplexingSequencer.Drainer)} 
     12 * indexes to producers. {@link #drainTo(MCSDSequencer.Drainer)} 
    1913 * will supply all indexes which was given by {@link #claim} at moment, 
    2014 * but not all of associated records are full with data. You need some kind of flag 
     
    2418 *         created 7/4/12 at 3:54 PM 
    2519 */ 
    26 public class DemultiplexingSequencer { 
    27         public static final long INVALID_INDEX = -1L; 
     20public class MCSDSequencer extends Sequencer { 
    2821 
    29         private static final Unsafe UNSAFE = UnsafeHelper.unsafe(); 
    30         private static final long HEAD_OFFSET; 
    31         private static final long TAIL_OFFSET; 
    32  
    33         static { 
    34                 try { 
    35                         final Field headCursorField = DemultiplexingSequencer.class.getDeclaredField( "headCursor" ); 
    36                         final Field tailCursorField = DemultiplexingSequencer.class.getDeclaredField( "tailCursor" ); 
    37                         HEAD_OFFSET = UNSAFE.objectFieldOffset( headCursorField ); 
    38                         TAIL_OFFSET = UNSAFE.objectFieldOffset( tailCursorField ); 
    39                         final long padding = TAIL_OFFSET - HEAD_OFFSET; 
    40                         if( padding < 64 ) { 
    41                                 System.out.println( "Padding is optimized out: " + padding ); 
    42                         } 
    43                 } catch( NoSuchFieldException e ) { 
    44                         throw new RuntimeException( e ); 
    45                 } 
     22        public MCSDSequencer( final int length ) { 
     23                super( length ); 
    4624        } 
    4725 
    48         private final int length; 
    4926 
    50         /** 
    51          * Elements range: [headCursor, tailCursor) 
    52          * <p/> 
    53          * (tailCursor - headCursor) == elements count 
    54          * <p/> 
    55          * 0 <= (tailCursor - headCursor) <= length  => state invariant 
    56          * <p/> 
    57          * tailCursor - headCursor == length         => buffer is full 
    58          * tailCursor - headCursor == 0              => buffer is empty 
    59          * <p/> 
    60          * (headCursor % size ) is the index of first item in buffer 
    61          * (tailCursor % size ) is the index of _cell_ for _next last item_ 
    62          */ 
    63         public volatile long r0, r1, r2, r3, r4, r5, r6, r7; 
    64         private volatile long headCursor = 0; 
    65         public volatile long p0, p1, p2, p3, p4, p5, p6, p7; 
    66         public volatile long q0, q1, q2, q3, q4, q5, q6, q7; 
    67         private volatile long tailCursor = 0; 
    68         public volatile long z0, z1, z2, z3, z4, z5, z6, z7; 
    69  
    70         public DemultiplexingSequencer( final int length ) { 
    71                 checkArgument( length > 0, "length(%s) must be > 0", length ); 
    72                 checkArgument( ( length & ( length - 1 ) ) == 0, 
    73                                "length %s should be power of 2", length ); 
    74  
    75                 this.length = length; 
    76         } 
    77  
    78         /** @return -1, if not available */ 
    79         public long claim() { 
    80                 return claim( 1 ); 
    81         } 
    82  
    83         public long claim( final int size ) { 
    84                 return claim( size, WaitingStrategy.NO_WAIT ); 
    85         } 
    86  
     27        @Override 
    8728        public long claim( final int size, 
    8829                           final WaitingStrategy waitingStrategy ) { 
     
    10243                } 
    10344        } 
    104  
    105         /** 
    106          * It is single-threaded method: it must be called from one thread, or be 
    107          * protected by external mutex. 
    108          */ 
    109         public void drainTo( final Drainer drainer ) { 
    110                 checkArgument( drainer != null, "drainer can't be null" ); 
    111  
    112                 //remember: claimed indexes are all in [headCursor, tailCursor) 
    113                 final long firstClaimed = headCursor; 
    114                 final long sentinelIndex = tailCursor; 
    115                 if( sentinelIndex > firstClaimed ) { 
    116                         final long reclaimedIndexes = drainer.available( 
    117                                         firstClaimed, 
    118                                         sentinelIndex 
    119                         ); 
    120                         final long maxForReclaim = sentinelIndex - firstClaimed; 
    121                         checkState( reclaimedIndexes >= 0 && reclaimedIndexes <= maxForReclaim, 
    122                                     "Can't reclaim %s indexes: only [0,%s] available", 
    123                                     reclaimedIndexes, maxForReclaim 
    124                         ); 
    125                         UNSAFE.putOrderedLong( this, HEAD_OFFSET, firstClaimed + reclaimedIndexes ); 
    126                 } 
    127         } 
    128  
    129  
    130         public interface Drainer { 
    131                 /** 
    132                  * Sequence range [startSequence, sentinelSequence) is claimed by sequencer, 
    133                  * and may be reclaimed 
    134                  * 
    135                  * @return number of sequences reclaimed. I.e [startSequence, startSequence+length] 
    136                  *         would be available for reuse. Max value allowed to return is 
    137                  *         (sentinelSequence-startSequence) 
    138                  */ 
    139  
    140                 public int available( final long startSequence, 
    141                                       final long sentinelSequence ); 
    142         } 
    14345} 
    144  
  • Tests/JAVA/logger/src/main/java/com/db/logger/api/impl/logger/RingBuffer.java

    r589 r594  
    77 *         created 05.12.13 at 0:41 
    88 */ 
    9 public final class CircularBuffer { 
     9public final class RingBuffer { 
    1010        private final ILongsBuffer buffer; 
    11         private final DemultiplexingSequencer sequencer; 
     11        private final Sequencer sequencer; 
    1212        private final WaitingStrategy waitingStrategy; 
    1313 
    14         public CircularBuffer( final DemultiplexingSequencer sequencer, 
    15                                final ILongsBuffer buffer, 
    16                                final WaitingStrategy waitingStrategy ) { 
     14        public RingBuffer( final Sequencer sequencer, 
     15                           final ILongsBuffer buffer, 
     16                           final WaitingStrategy waitingStrategy ) { 
    1717                this.waitingStrategy = waitingStrategy; 
    1818                this.sequencer = sequencer; 
     
    2828        } 
    2929 
    30         public DemultiplexingSequencer sequencer() { 
    31                 return sequencer; 
    32         } 
    33  
    34         public WaitingStrategy waitingStrategy() { 
    35                 return waitingStrategy; 
     30        public void drainTo( final Sequencer.Drainer drainer ) { 
     31                sequencer.drainTo( drainer ); 
    3632        } 
    3733 
    3834        public long claim( final int size ) { 
    39                 return sequencer().claim( 
     35                return sequencer.claim( 
    4036                                size, 
    4137                                waitingStrategy 
  • Tests/JAVA/logger/src/main/java/com/db/logger/api/impl/logger/formatters/AbstractLogBuilder.java

    r590 r594  
    22 
    33import com.db.logger.api.FluentLogBuilder; 
    4 import com.db.logger.api.LogMessage; 
    5 import com.db.logger.api.impl.logger.CircularBuffer; 
     4import com.db.logger.api.impl.logger.RingBuffer; 
    65import com.db.logger.api.impl.logger.RecordHelper; 
    76import net.jcip.annotations.NotThreadSafe; 
    87 
    9 import static com.db.logger.api.impl.logger.DemultiplexingSequencer.INVALID_INDEX; 
     8import static com.db.logger.api.impl.logger.MCSDSequencer.INVALID_INDEX; 
    109import static com.db.logger.api.impl.logger.RecordHelper.RecordType.LOG_RECORD; 
    1110import static com.google.common.base.Preconditions.checkArgument; 
     
    2019        public static final int NOT_SET = -1; 
    2120 
    22         private final CircularBuffer buffer; 
     21        private final RingBuffer buffer; 
    2322 
    2423        protected int argumentIndex = NOT_SET; 
    2524        protected long position = INVALID_INDEX; 
    2625 
    27         public AbstractLogBuilder( final CircularBuffer buffer ) { 
     26        public AbstractLogBuilder( final RingBuffer buffer ) { 
    2827                checkArgument( buffer != null, "buffer can't be null" ); 
    2928                this.buffer = buffer; 
  • Tests/JAVA/logger/src/main/java/com/db/logger/api/impl/logger/formatters/FlyweightLogMessage.java

    r590 r594  
    22 
    33import com.db.logger.api.LogMessage; 
    4 import com.db.logger.api.impl.logger.CircularBuffer; 
     4import com.db.logger.api.impl.logger.RingBuffer; 
    55import com.db.logger.api.impl.logger.MessageInfo; 
    66 
     
    1818        private MessageInfo messageInfo = null; 
    1919 
    20         public FlyweightLogMessage( final CircularBuffer circularBuffer ) { 
    21                 super( circularBuffer ); 
     20        public FlyweightLogMessage( final RingBuffer ringBuffer ) { 
     21                super( ringBuffer ); 
    2222        } 
    2323 
  • Tests/JAVA/logger/src/main/java/com/db/logger/api/impl/logger/formatters/RawLogMessage.java

    r590 r594  
    33import com.db.logger.api.LogMessage; 
    44import com.db.logger.api.FluentLogBuilder; 
    5 import com.db.logger.api.impl.logger.CircularBuffer; 
     5import com.db.logger.api.impl.logger.RingBuffer; 
    66import com.db.logger.api.impl.logger.MessageInfo; 
    77import com.db.logger.api.impl.logger.RecordHelper; 
    88 
    99import static com.db.logger.api.impl.logger.formatters.AbstractLogBuilder.NOT_SET; 
    10 import static com.db.logger.api.impl.logger.DemultiplexingSequencer.INVALID_INDEX; 
     10import static com.db.logger.api.impl.logger.MCSDSequencer.INVALID_INDEX; 
    1111import static com.db.logger.api.impl.logger.RecordHelper.RecordType.LOG_RECORD; 
    1212import static com.google.common.base.Preconditions.checkArgument; 
     
    1919public final class RawLogMessage implements LogMessage, FluentLogBuilder { 
    2020 
    21         private final CircularBuffer buffer; 
     21        private final RingBuffer buffer; 
    2222 
    2323        private int argumentIndex = NOT_SET; 
     
    2929        private int argumentsCount; 
    3030 
    31         public RawLogMessage( final CircularBuffer circularBuffer ) { 
    32                 this.buffer = circularBuffer; 
     31        public RawLogMessage( final RingBuffer ringBuffer ) { 
     32                this.buffer = ringBuffer; 
    3333        } 
    3434 
  • Tests/JAVA/logger/src/main/java/com/db/logger/api/impl/logger/formatters/SimpleLogMessage.java

    r590 r594  
    11package com.db.logger.api.impl.logger.formatters; 
    22 
    3 import com.db.logger.api.FluentLogBuilder; 
    43import com.db.logger.api.LogMessage; 
    5 import com.db.logger.api.impl.logger.CircularBuffer; 
     4import com.db.logger.api.impl.logger.RingBuffer; 
    65import net.jcip.annotations.NotThreadSafe; 
    76 
     
    2423                                 final int formatId, 
    2524                                 final int argumentsCount, 
    26                                  final CircularBuffer circularBuffer ) { 
    27                 super( circularBuffer ); 
     25                                 final RingBuffer ringBuffer ) { 
     26                super( ringBuffer ); 
    2827                this.format = format; 
    2928 
  • Tests/JAVA/logger/src/main/java/com/db/logger/api/impl/logger/formatters/SimpleLogMessageExpanded.java

    r590 r594  
    33import com.db.logger.api.FluentLogBuilder; 
    44import com.db.logger.api.LogMessage; 
    5 import com.db.logger.api.impl.logger.CircularBuffer; 
     5import com.db.logger.api.impl.logger.RingBuffer; 
    66import com.db.logger.api.impl.logger.RecordHelper; 
    77 
    8 import static com.db.logger.api.impl.logger.DemultiplexingSequencer.INVALID_INDEX; 
     8import static com.db.logger.api.impl.logger.MCSDSequencer.INVALID_INDEX; 
    99import static com.db.logger.api.impl.logger.RecordHelper.RecordType.LOG_RECORD; 
    1010import static com.db.logger.api.impl.logger.formatters.AbstractLogBuilder.NOT_SET; 
     
    1717public final class SimpleLogMessageExpanded implements LogMessage, FluentLogBuilder { 
    1818 
    19         private final CircularBuffer buffer; 
     19        private final RingBuffer buffer; 
    2020 
    2121        private int argumentIndex = NOT_SET; 
     
    3030                                         final int formatId, 
    3131                                         final int argumentsCount, 
    32                                          final CircularBuffer circularBuffer ) { 
    33                 this.buffer = circularBuffer; 
     32                                         final RingBuffer ringBuffer ) { 
     33                this.buffer = ringBuffer; 
    3434                this.format = format; 
    3535 
  • Tests/JAVA/logger/src/main/java/com/db/logger/benchmarks/BufferWriteAndDrainBenchmark.java

    r593 r594  
    44import java.util.concurrent.atomic.AtomicInteger; 
    55 
    6 import com.db.logger.api.impl.logger.DemultiplexingSequencer; 
    7 import com.db.logger.api.impl.logger.RecordHelper; 
     6import com.db.logger.api.impl.logger.*; 
    87import com.db.logger.api.impl.logger.buffer.DirectAccessLongBuffer; 
    98import com.db.logger.api.impl.logger.buffer.ILongsBuffer; 
    10 import com.db.logger.api.impl.logger.WaitingStrategy; 
    119import org.openjdk.jmh.annotations.*; 
    1210import org.openjdk.jmh.logic.BlackHole; 
    1311 
    1412import static com.db.logger.api.impl.logger.RecordHelper.NOT_SET; 
     13import static com.db.logger.api.impl.logger.RecordHelper.RecordType.LOG_RECORD; 
     14import static com.db.logger.api.impl.logger.RecordHelper.header; 
    1515 
    1616/** 
     
    3030        public static final AtomicInteger ID_GENERATOR = new AtomicInteger( 1 ); 
    3131 
    32         public static final DemultiplexingSequencer.Drainer DRAIN_DUMMY = new DemultiplexingSequencer.Drainer() { 
     32        public static final Sequencer.Drainer DRAIN_DUMMY = new Sequencer.Drainer() { 
    3333                @Override 
    3434                public int available( final long startSequence, 
     
    4343 
    4444        public ILongsBuffer buffer; 
    45         public DemultiplexingSequencer sequencer; 
     45        public MCSDSequencer sequencer; 
    4646 
    47         public DemultiplexingSequencer.Drainer DRAIN_AND_READ; 
     47        public Sequencer.Drainer DRAIN_AND_READ; 
    4848 
    4949        @Setup 
    5050        public void setup() { 
    5151                final int length = 1 << LENGTH_POW; 
    52                 buffer = new DirectAccessLongBuffer( length, RecordHelper.NOT_SET ); 
    53                 sequencer = new DemultiplexingSequencer( length ); 
     52                buffer = new DirectAccessLongBuffer( length, NOT_SET ); 
     53                sequencer = new MCSDSequencer( length ); 
    5454 
    55                 DRAIN_AND_READ = new DemultiplexingSequencer.Drainer() { 
    56                         @Override 
    57                         public int available( final long startSequence, 
    58                                               final long sentinelSequence ) { 
    59                                 int spinsAvailable = 256; 
    60                                 for( long pos = startSequence; pos < sentinelSequence; pos++ ) { 
    61                                         if( ( pos % CELLS_PER_RECORD ) == 0 ) { 
    62                                                 for(; spinsAvailable >= 0; spinsAvailable-- ) { 
    63                                                         final long header = buffer.getVolatile( pos ); 
    64                                                         if( header > 0 ) { 
    65                                                                 buffer.put( pos, NOT_SET ); 
    66                                                                 break; 
    67                                                         } 
    68                                                 } 
    69                                                 if( spinsAvailable < 0 ) { 
    70                                                         return ( int ) ( pos - startSequence ); 
    71                                                 } 
    72                                         } else { 
    73                                                 buffer.get( pos ); 
    74                                         } 
    75                                 } 
    76                                 return ( int ) ( sentinelSequence - startSequence ); 
    77                         } 
    78                 }; 
     55                DRAIN_AND_READ = new ConsumingDrainer( buffer ); 
    7956        } 
    8057 
     
    157134                } 
    158135                //write header with SA 
    159                 buffer.putOrdered( position, writerId ); 
     136                final long header = header( LOG_RECORD, writerId, cellsCount - 1 ); 
     137                buffer.putOrdered( position, header ); 
    160138        } 
    161139 
  • Tests/JAVA/logger/src/main/java/com/db/logger/benchmarks/ConsumingDrainer.java

    r590 r594  
    11package com.db.logger.benchmarks; 
    22 
    3 import com.db.logger.api.impl.logger.DemultiplexingSequencer; 
     3import com.db.logger.api.impl.logger.MCSDSequencer; 
    44import com.db.logger.api.impl.logger.RecordHelper; 
    55import com.db.logger.api.impl.logger.buffer.ILongsBuffer; 
     
    1212 *         created 06.12.13 at 2:07 
    1313 */ 
    14 public class ConsumingDrainer implements DemultiplexingSequencer.Drainer { 
     14public class ConsumingDrainer implements MCSDSequencer.Drainer { 
    1515        private static final int SPINS_PER_TURN = 256; 
    1616 
  • Tests/JAVA/logger/src/main/java/com/db/logger/benchmarks/FastLoggerWithoutWriteBenchmark.java

    r590 r594  
    3030        private static final Logger log = Logger.getLogger( FastLoggerWithoutWriteBenchmark.class ); 
    3131        public static final int LENGTH = Integer.getInteger( "length", 1 << 14 ); 
     32        /** 
     33         * It's CELLS-1 arguments + 1 cell for header -- to be consistent with previous 
     34         * benchmarks 
     35         */ 
    3236        public static final int CELLS_PER_RECORD = Integer.getInteger( "cells-per-record", 8 );//8longs = 1 cache line 
    3337        public static final int WRITER_BACKOFF = Integer.getInteger( "writer-backoff", 20 ); 
     
    4347 
    4448        public ILongsBuffer buffer; 
    45         public DemultiplexingSequencer sequencer; 
     49        public MCSDSequencer sequencer; 
    4650 
    4751        public FastLoggerImpl logger; 
     
    5054        public void setup() { 
    5155                buffer = new DirectAccessLongBuffer( LENGTH, RecordHelper.NOT_SET ); 
    52                 sequencer = new DemultiplexingSequencer( LENGTH ); 
     56                sequencer = new MCSDSequencer( LENGTH ); 
    5357                logger = new FastLoggerImpl( 
    5458                                new ThreadFactory() { 
     
    97101                } 
    98102                FluentLogBuilder logBuilder = ts.simpleMessage.with( 5d ); 
    99                 final int count = CELLS_PER_RECORD; 
     103                final int count = CELLS_PER_RECORD - 1; 
    100104                for( int i = 1; i < count; i++ ) { 
    101105                        logBuilder = logBuilder.with( ( long ) i ); 
     
    115119                } 
    116120                FluentLogBuilder logBuilder = ts.threadLocalMessage.with( 5d ); 
    117                 final int count = CELLS_PER_RECORD; 
     121                final int count = CELLS_PER_RECORD - 1; 
    118122                for( int i = 1; i < count; i++ ) { 
    119123                        logBuilder = logBuilder.with( ( long ) i ); 
     
    128132        @Group( "logAndPayload" ) 
    129133        @GroupThreads( 3 ) 
    130         public void writeMessage( final ThreadState ts ) { 
     134        public void writeRawMessage( final ThreadState ts ) { 
    131135                FluentLogBuilder logBuilder = logger.message( ts.messageFormat ) 
    132136                                .with( 5d ); 
    133                 final int count = CELLS_PER_RECORD; 
    134                 for( int i = 1; i < count; i++ ) { 
    135                         logBuilder = logBuilder.with( ( long ) i ); 
    136                 } 
    137                 logBuilder.log(); 
    138  
    139                 BlackHole.consumeCPU( WRITER_BACKOFF ); 
    140         } 
    141  
    142 //      @GenerateMicroBenchmark 
    143 //      @Group( "lookupMessageInfo" ) 
    144 //      @GroupThreads( 3 ) 
    145 //      public Object lookupMessageInfo( final ThreadState ts ) { 
    146 //              return logger.lookupMessageInfo( ts.messageFormat ); 
    147 //      } 
     137                final int count = CELLS_PER_RECORD - 1; 
     138                for( int i = 1; i < count; i++ ) { 
     139                        logBuilder = logBuilder.with( ( long ) i ); 
     140                } 
     141                logBuilder.log(); 
     142 
     143                BlackHole.consumeCPU( WRITER_BACKOFF ); 
     144        } 
     145 
     146        @GenerateMicroBenchmark 
     147        @Group( "lookupMessageInfo" ) 
     148        @GroupThreads( 3 ) 
     149        public Object lookupMessageInfo( final ThreadState ts ) { 
     150                return logger.lookupMessageInfo( ts.messageFormat ); 
     151        } 
    148152 
    149153        @State( Scope.Thread ) 
     
    156160                        final StringBuilder sb = new StringBuilder(); 
    157161                        sb.append( id ); 
    158                         for( int i = 0; i < CELLS_PER_RECORD; i++ ) { 
     162                        //actually, it's CELLS-1 arguments + 1 cell for header -- to be consistent 
     163                        //with previous benchmarks 
     164                        for( int i = 1; i < CELLS_PER_RECORD; i++ ) { 
    159165                                sb.append( " %d" ); 
    160166                        } 
  • Tests/JAVA/logger/src/main/java/com/db/logger/benchmarks/MultiBuffersWriteAndDrainBenchmark.java

    r593 r594  
    11package com.db.logger.benchmarks; 
    22 
     3import java.util.*; 
     4import java.util.concurrent.CopyOnWriteArrayList; 
    35import java.util.concurrent.TimeUnit; 
    46import java.util.concurrent.atomic.AtomicInteger; 
    57 
    6 import com.db.logger.api.impl.logger.DemultiplexingSequencer; 
    7 import com.db.logger.api.impl.logger.RecordHelper; 
     8import com.db.logger.api.impl.logger.*; 
    89import com.db.logger.api.impl.logger.buffer.DirectAccessLongBuffer; 
    910import com.db.logger.api.impl.logger.buffer.ILongsBuffer; 
    10 import com.db.logger.api.impl.logger.WaitingStrategy; 
    1111import org.openjdk.jmh.annotations.*; 
    1212import org.openjdk.jmh.logic.BlackHole; 
    1313 
    14 import static com.db.logger.api.impl.logger.RecordHelper.NOT_SET; 
     14import static com.db.logger.api.impl.logger.RecordHelper.RecordType.LOG_RECORD; 
     15import static com.db.logger.api.impl.logger.RecordHelper.header; 
    1516 
    1617/** 
     18 * We use many (3) single-writer single-reader ring buffers, one per each 
     19 * 'logging' thread, instead of one multi-writer single reader ring buffer 
     20 * for all them at once. 
     21 * 
    1722 * @author ruslan 
    1823 *         created 22.11.13 at 20:04 
     
    2126@OutputTimeUnit( TimeUnit.NANOSECONDS ) 
    2227@State( Scope.Group ) 
    23 public class BufferWriteAndDrainBenchmark { 
     28public class MultiBuffersWriteAndDrainBenchmark { 
    2429        public static final int LENGTH_POW = Integer.getInteger( "length-pow", 14 ); 
    2530        public static final int CELLS_PER_RECORD = Integer.getInteger( "cells-per-record", 8 );//8longs = 1 cache line 
     
    3035        public static final AtomicInteger ID_GENERATOR = new AtomicInteger( 1 ); 
    3136 
    32         public static final DemultiplexingSequencer.Drainer DRAIN_DUMMY = new DemultiplexingSequencer.Drainer() { 
     37        public static final Sequencer.Drainer DRAIN_DUMMY = new MCSDSequencer.Drainer() { 
    3338                @Override 
    3439                public int available( final long startSequence, 
     
    4247        } 
    4348 
    44         public ILongsBuffer buffer; 
    45         public DemultiplexingSequencer sequencer; 
    46  
    47         public DemultiplexingSequencer.Drainer DRAIN_AND_READ; 
    4849 
    4950        @Setup 
    5051        public void setup() { 
    51                 final int length = 1 << LENGTH_POW; 
    52                 buffer = new DirectAccessLongBuffer( length, RecordHelper.NOT_SET ); 
    53                 sequencer = new DemultiplexingSequencer( length ); 
    54  
    55                 DRAIN_AND_READ = new DemultiplexingSequencer.Drainer() { 
    56                         @Override 
    57                         public int available( final long startSequence, 
    58                                               final long sentinelSequence ) { 
    59                                 int spinsAvailable = 256; 
    60                                 for( long pos = startSequence; pos < sentinelSequence; pos++ ) { 
    61                                         if( ( pos % CELLS_PER_RECORD ) == 0 ) { 
    62                                                 for(; spinsAvailable >= 0; spinsAvailable-- ) { 
    63                                                         final long header = buffer.getVolatile( pos ); 
    64                                                         if( header > 0 ) { 
    65                                                                 buffer.put( pos, NOT_SET ); 
    66                                                                 break; 
    67                                                         } 
    68                                                 } 
    69                                                 if( spinsAvailable < 0 ) { 
    70                                                         return ( int ) ( pos - startSequence ); 
    71                                                 } 
    72                                         } else { 
    73                                                 buffer.get( pos ); 
    74                                         } 
    75                                 } 
    76                                 return ( int ) ( sentinelSequence - startSequence ); 
    77                         } 
    78                 }; 
    7952        } 
    8053 
    8154        @TearDown 
    8255        public void tearDown() { 
    83  
     56                states.clear(); 
    8457        } 
    8558 
     
    8861                //TODO set thread affinity 
    8962        } 
     63 
     64        public static final List<ThreadState> states = new CopyOnWriteArrayList<ThreadState>(); 
    9065 
    9166        /*=============================================================================*/ 
     
    10277        @GroupThreads( 3 )//actually it's (CORES-1) 
    10378        public void writer( final ThreadState ts ) { 
    104                 writeEntry( ts.id, ts.count ); 
     79                writeEntry( ts, ts.id, ts.count ); 
    10580                BlackHole.consumeCPU( WRITER_BACKOFF ); 
    10681        } 
     
    11287                //mostly we do not care about drain latency here, measure just to be aware of it 
    11388                try { 
    114                         sequencer.drainTo( DRAIN_DUMMY ); 
     89                        for( final ThreadState ts : states ) { 
     90                                ts.ringBuffer.drainTo( DRAIN_DUMMY ); 
     91                        } 
    11592                        BlackHole.consumeCPU( 10 ); 
    11693                } catch( Throwable e ) { 
     
    126103        @GroupThreads( 3 ) //actually it's (CORES-1) 
    127104        public void writer2( final ThreadState ts ) { 
    128                 writeEntry( ts.id, ts.count ); 
     105                writeEntry( ts, ts.id, ts.count ); 
    129106                BlackHole.consumeCPU( WRITER_BACKOFF ); 
    130107        } 
     
    136113                //mostly we do not care about drain latency here, measure just to be aware of it 
    137114                try { 
    138                         sequencer.drainTo( DRAIN_AND_READ ); 
     115                        for( final ThreadState ts : states ) { 
     116                                ts.ringBuffer.drainTo( ts.consumingDrainer ); 
     117                        } 
    139118//                      BlackHole.consumeCPU( 100 ); 
    140119                } catch( Throwable e ) { 
     
    145124        /*=============================================================================*/ 
    146125 
    147         private void writeEntry( final int writerId, 
     126        private void writeEntry( final ThreadState ts, 
     127                                 final int writerId, 
    148128                                 final int cellsCount ) { 
    149                 final long position = sequencer.claim( cellsCount, WAITING_STRATEGY ); 
     129                final long position = ts.ringBuffer.claim( cellsCount ); 
    150130                if( position < 0 ) { 
    151131                        System.err.println( "Timeout" ); 
     
    153133                } 
    154134                //reserve 0-th cell for header 
     135                final ILongsBuffer data = ts.ringBuffer.buffer(); 
    155136                for( int i = 1; i < cellsCount; i++ ) { 
    156                         buffer.put( position + i, i ); 
     137                        data.put( position + i, i ); 
    157138                } 
    158139                //write header with SA 
    159                 buffer.putOrdered( position, writerId ); 
     140                data.putOrdered( 
     141                                position, 
     142                                header( 
     143                                                LOG_RECORD, 
     144                                                writerId, 
     145                                                cellsCount - 1 
     146                                ) 
     147                ); 
    160148        } 
    161149 
     
    164152                public final int id = ID_GENERATOR.incrementAndGet(); 
    165153                public final int count = CELLS_PER_RECORD;//but 1 for header 
     154 
     155                public RingBuffer ringBuffer; 
     156                public Sequencer.Drainer consumingDrainer; 
     157 
     158                public ThreadState() { 
     159                        final int length = 1 << LENGTH_POW; 
     160                        final DirectAccessLongBuffer longBuffer = new DirectAccessLongBuffer( 
     161                                        length, 
     162                                        RecordHelper.NOT_SET 
     163                        ); 
     164 
     165                        ringBuffer = new RingBuffer( 
     166                                        new SCSDSequencer( length ), 
     167                                        longBuffer, 
     168                                        WAITING_STRATEGY 
     169                        ); 
     170 
     171                        consumingDrainer = new ConsumingDrainer( longBuffer ); 
     172                } 
     173 
     174                @Setup 
     175                public void setup() { 
     176                        states.add( this ); 
     177                } 
     178 
     179                @TearDown 
     180                public void destroy() { 
     181                        states.remove( this ); 
     182                } 
    166183        } 
    167184 
    168185        public static void main( final String[] args ) throws Exception { 
    169                 final BufferWriteAndDrainBenchmark benchmark = new BufferWriteAndDrainBenchmark(); 
     186                final MultiBuffersWriteAndDrainBenchmark benchmark = new MultiBuffersWriteAndDrainBenchmark(); 
    170187 
    171188                benchmark.setup(); 
     
    196213                Thread.sleep( 300000 ); 
    197214        } 
     215 
    198216} 
  • Tests/JAVA/logger/src/main/java/com/db/logger/benchmarks/RawLogMessageWriteAndDrainBenchmark.java

    r590 r594  
    3535 
    3636        public ILongsBuffer buffer; 
    37         public DemultiplexingSequencer sequencer; 
     37        public MCSDSequencer sequencer; 
    3838 
    39         public DemultiplexingSequencer.Drainer readingConsumer; 
     39        public MCSDSequencer.Drainer readingConsumer; 
    4040 
    4141        @Setup 
    4242        public void setup() { 
    4343                buffer = new DirectAccessLongBuffer( LENGTH, RecordHelper.NOT_SET ); 
    44                 sequencer = new DemultiplexingSequencer( LENGTH ); 
     44                sequencer = new MCSDSequencer( LENGTH ); 
    4545 
    4646                readingConsumer = new ConsumingDrainer( buffer ); 
     
    103103                public void setup( final RawLogMessageWriteAndDrainBenchmark b ) { 
    104104                        formatter = new RawLogMessage( 
    105                                         new CircularBuffer( b.sequencer, b.buffer, WAITING_STRATEGY ) 
     105                                        new RingBuffer( b.sequencer, b.buffer, WAITING_STRATEGY ) 
    106106                        ).setup( 
    107107                                        new MessageInfo( 
  • Tests/JAVA/logger/src/main/java/com/db/logger/benchmarks/SimpleLogMessageWriteAndDrainBenchmark.java

    r590 r594  
    88import com.db.logger.api.impl.logger.buffer.DirectAccessLongBuffer; 
    99import com.db.logger.api.impl.logger.buffer.ILongsBuffer; 
    10 import com.db.logger.api.impl.logger.formatters.SimpleLogMessage; 
    1110import com.db.logger.api.impl.logger.formatters.SimpleLogMessageExpanded; 
    1211import org.openjdk.jmh.annotations.*; 
    1312import org.openjdk.jmh.logic.BlackHole; 
    14  
    15 import static com.db.logger.api.impl.logger.RecordHelper.*; 
    1613 
    1714/** 
     
    3633 
    3734        public ILongsBuffer buffer; 
    38         public DemultiplexingSequencer sequencer; 
     35        public MCSDSequencer sequencer; 
    3936 
    40         public DemultiplexingSequencer.Drainer readingConsumer; 
     37        public MCSDSequencer.Drainer readingConsumer; 
    4138 
    4239        @Setup 
    4340        public void setup() { 
    4441                buffer = new DirectAccessLongBuffer( LENGTH, RecordHelper.NOT_SET ); 
    45                 sequencer = new DemultiplexingSequencer( LENGTH ); 
     42                sequencer = new MCSDSequencer( LENGTH ); 
    4643 
    4744                readingConsumer = new ConsumingDrainer( buffer ); 
     
    106103                                        id, 
    107104                                        CELLS_PER_RECORD - 1, 
    108                                         new CircularBuffer( b.sequencer, b.buffer, WAITING_STRATEGY ) 
     105                                        new RingBuffer( b.sequencer, b.buffer, WAITING_STRATEGY ) 
    109106                        ); 
    110107                } 
  • Tests/JAVA/logger/src/main/java/com/db/logger/benchmarks/helpers/MCSDSequencerBenchmark.java

    r593 r594  
    44import java.util.concurrent.atomic.AtomicInteger; 
    55 
    6 import com.db.logger.api.impl.logger.DemultiplexingSequencer; 
     6import com.db.logger.api.impl.logger.MCSDSequencer; 
    77import com.db.logger.api.impl.logger.WaitingStrategy; 
    88import org.apache.log4j.BasicConfigurator; 
     
    1414 * Here we measure raw sequencer performance: just position claiming/reclaiming 
    1515 * <p/> 
    16  * TODO: sometimes hangs! why? 
    1716 * 
    1817 * @author ruslan 
     
    2221@OutputTimeUnit( TimeUnit.NANOSECONDS ) 
    2322@State( Scope.Group ) 
    24 public class SequencerBenchmark { 
    25         private static final Logger log = Logger.getLogger( SequencerBenchmark.class ); 
     23public class MCSDSequencerBenchmark { 
     24        private static final Logger log = Logger.getLogger( MCSDSequencerBenchmark.class ); 
    2625 
    2726        public static final int LENGTH = Integer.getInteger( "length", 1 << 14 ); 
     
    3837        public static final AtomicInteger ID_GENERATOR = new AtomicInteger( 1 ); 
    3938 
    40         public static final DemultiplexingSequencer.Drainer DRAIN_DUMMY = new DemultiplexingSequencer.Drainer() { 
     39        public static final MCSDSequencer.Drainer DRAIN_DUMMY = new MCSDSequencer.Drainer() { 
    4140                @Override 
    4241                public int available( final long startSequence, 
     
    4746 
    4847 
    49         public DemultiplexingSequencer sequencer; 
     48        public MCSDSequencer sequencer; 
    5049 
    5150        @Setup 
    5251        public void setup() { 
    53                 sequencer = new DemultiplexingSequencer( LENGTH ); 
     52                sequencer = new MCSDSequencer( LENGTH ); 
    5453        } 
    5554 
     
    103102 
    104103        public static void main( final String[] args ) throws Exception { 
    105                 final SequencerBenchmark benchmark = new SequencerBenchmark(); 
     104                final MCSDSequencerBenchmark benchmark = new MCSDSequencerBenchmark(); 
    106105 
    107106                benchmark.setup(); 
  • Tests/JAVA/logger/src/main/java/com/db/logger/benchmarks/helpers/SCSDSequencerBenchmark.java

    r593 r594  
    11package com.db.logger.benchmarks.helpers; 
    22 
     3import java.util.*; 
     4import java.util.concurrent.CopyOnWriteArrayList; 
    35import java.util.concurrent.TimeUnit; 
    46import java.util.concurrent.atomic.AtomicInteger; 
    57 
    6 import com.db.logger.api.impl.logger.DemultiplexingSequencer; 
     8import com.db.logger.api.impl.logger.MCSDSequencer; 
     9import com.db.logger.api.impl.logger.SCSDSequencer; 
    710import com.db.logger.api.impl.logger.WaitingStrategy; 
    811import org.apache.log4j.BasicConfigurator; 
     
    1316/** 
    1417 * Here we measure raw sequencer performance: just position claiming/reclaiming 
     18 * It is single-claimer single-drainer (SCSD) sequencer 
    1519 * <p/> 
    16  * TODO: sometimes hangs! why? 
    1720 * 
    1821 * @author ruslan 
     
    2225@OutputTimeUnit( TimeUnit.NANOSECONDS ) 
    2326@State( Scope.Group ) 
    24 public class SequencerBenchmark { 
    25         private static final Logger log = Logger.getLogger( SequencerBenchmark.class ); 
     27public class SCSDSequencerBenchmark { 
     28        private static final Logger log = Logger.getLogger( SCSDSequencerBenchmark.class ); 
    2629 
    2730        public static final int LENGTH = Integer.getInteger( "length", 1 << 14 ); 
     
    3841        public static final AtomicInteger ID_GENERATOR = new AtomicInteger( 1 ); 
    3942 
    40         public static final DemultiplexingSequencer.Drainer DRAIN_DUMMY = new DemultiplexingSequencer.Drainer() { 
     43        public static final MCSDSequencer.Drainer DRAIN_DUMMY = new MCSDSequencer.Drainer() { 
    4144                @Override 
    4245                public int available( final long startSequence, 
     
    4750 
    4851 
    49         public DemultiplexingSequencer sequencer; 
     52        public static final List<ThreadState> sequencers = new CopyOnWriteArrayList<ThreadState>(); 
    5053 
    5154        @Setup 
    5255        public void setup() { 
    53                 sequencer = new DemultiplexingSequencer( LENGTH ); 
     56 
    5457        } 
    5558 
    5659        @TearDown 
    5760        public void tearDown() { 
    58  
     61                sequencers.clear(); 
    5962        } 
    6063 
     
    7780        @GroupThreads( 3 )//actually it's (CORES-1) 
    7881        public void claimer3( final ThreadState ts ) { 
    79                 claimEntry( ts.id, ts.count ); 
     82                claimEntry( ts.sequencer, ts.id, ts.count ); 
    8083                BlackHole.consumeCPU( WRITER_BACKOFF ); 
    8184        } 
     
    8588        @GroupThreads( 1 ) 
    8689        public void drainer() { 
    87                 sequencer.drainTo( DRAIN_DUMMY ); 
    88                 BlackHole.consumeCPU( WRITER_BACKOFF ); 
     90                for( final ThreadState ts : sequencers ) { 
     91                        ts.sequencer.drainTo( DRAIN_DUMMY ); 
     92                } 
     93//              BlackHole.consumeCPU( WRITER_BACKOFF ); 
    8994        } 
    9095 
    9196        /*=============================================================================*/ 
    9297 
    93         private long claimEntry( final int writerId, 
     98        private long claimEntry( final SCSDSequencer sequencer, 
     99                                 final int writerId, 
    94100                                 final int cellsCount ) { 
    95101                return sequencer.claim( cellsCount, WAITING_STRATEGY ); 
     
    100106                public final int id = ID_GENERATOR.incrementAndGet(); 
    101107                public final int count = CELLS_PER_RECORD; 
     108 
     109                public SCSDSequencer sequencer; 
     110 
     111                @Setup 
     112                public void setup() { 
     113                        sequencer = new SCSDSequencer( LENGTH ); 
     114                        sequencers.add( this ); 
     115                } 
     116 
     117                @TearDown 
     118                public void destroy() { 
     119                        sequencers.remove( this ); 
     120                } 
    102121        } 
    103122 
    104123        public static void main( final String[] args ) throws Exception { 
    105                 final SequencerBenchmark benchmark = new SequencerBenchmark(); 
     124                final SCSDSequencerBenchmark benchmark = new SCSDSequencerBenchmark(); 
    106125 
    107126                benchmark.setup(); 
  • Tests/JAVA/logger/src/test/java/com/db/logger/api/impl/logger/MCSDSequencerTest.java

    r589 r594  
    11package com.db.logger.api.impl.logger; 
    2  
    3 import org.junit.Test; 
    4  
    5 import static com.db.logger.api.impl.logger.DemultiplexingSequencer.INVALID_INDEX; 
    6 import static org.junit.Assert.*; 
    72 
    83/** 
     
    105 *         created 7/4/12 at 4:25 PM 
    116 */ 
    12 public class DemultiplexingSequencerTest { 
    13  
    14  
    15         @Test( expected = IllegalArgumentException.class ) 
    16         public void cantCreateEmptySequencer() throws Exception { 
    17                 new DemultiplexingSequencer( 0 ); 
    18         } 
    19  
    20         @Test 
    21         public void sequencerSizeIndexesAvailableInFreshSequencer() throws Exception { 
    22                 final int size = 4; 
    23                 final DemultiplexingSequencer sequencer = createSequencer( size ); 
    24                 for( int i = 0; i < size; i++ ) { 
    25                         assertNotEquals( INVALID_INDEX, sequencer.claim() ); 
    26                 } 
    27         } 
    28  
    29         @Test 
    30         public void nextAvailableReturnsInvalidIndexWhenSequencerExhausted() throws Exception { 
    31                 final int size = 4; 
    32                 final DemultiplexingSequencer sequencer = createSequencer( size ); 
    33  
    34                 assertExactIndexesAvailable( sequencer, size ); 
    35         } 
    36  
    37         @Test 
    38         public void drainerNotCalledIfNothingClaimedYet() throws Exception { 
    39                 final int size = 4; 
    40                 final DemultiplexingSequencer sequencer = createSequencer( size ); 
    41                 sequencer.drainTo( new DemultiplexingSequencer.Drainer() { 
    42                         @Override 
    43                         public int available( final long startSequence, 
    44                                               final long sentinelSequence ) { 
    45                                 fail( "Called with " + startSequence + ", " + sentinelSequence ); 
    46                                 return 0; 
    47                         } 
    48                 } ); 
    49         } 
    50  
    51         @Test 
    52         public void drainToSuppliesEveryIndexInClaimedRange() throws Exception { 
    53                 final int size = 4; 
    54                 final DemultiplexingSequencer sequencer = createSequencer( size ); 
    55                 exhaustSequencer( sequencer, size ); 
    56                 sequencer.drainTo( new DemultiplexingSequencer.Drainer() { 
    57                         @Override 
    58                         public int available( final long startSequence, 
    59                                               final long sentinelSequence ) { 
    60                                 assertEquals( 0, startSequence ); 
    61                                 assertEquals( size, sentinelSequence ); 
    62                                 return 0; 
    63                         } 
    64                 } ); 
    65  
    66         } 
    67  
    68         @Test 
    69         public void drainMakesEntriesAvailableForReuse() throws Exception { 
    70                 final int size = 4; 
    71  
    72                 final DemultiplexingSequencer sequencer = createSequencer( size ); 
    73  
    74                 exhaustSequencer( sequencer, size ); 
    75  
    76                 sequencer.drainTo( new DemultiplexingSequencer.Drainer() { 
    77                         @Override 
    78                         public int available( final long startSequence, 
    79                                               final long sentinelSequence ) { 
    80                                 return ( int ) ( sentinelSequence - startSequence ); 
    81                         } 
    82                 } ); 
    83  
    84                 assertExactIndexesAvailable( sequencer, size ); 
    85         } 
    86  
    87         @Test( expected = IllegalStateException.class ) 
    88         public void drainFailsIfReturnedNegativeValue() throws Exception { 
    89                 final int size = 4; 
    90  
    91                 final DemultiplexingSequencer sequencer = createSequencer( size ); 
    92  
    93                 exhaustSequencer( sequencer, size ); 
    94  
    95                 sequencer.drainTo( new DemultiplexingSequencer.Drainer() { 
    96                         @Override 
    97                         public int available( final long startSequence, 
    98                                               final long sentinelSequence ) { 
    99                                 return -1; 
    100                         } 
    101                 } ); 
    102         } 
    103  
    104         @Test( expected = IllegalStateException.class ) 
    105         public void drainFailsIfReturnedLengthMoreThenMaximum() throws Exception { 
    106                 final int size = 4; 
    107  
    108                 final DemultiplexingSequencer sequencer = createSequencer( size ); 
    109  
    110                 exhaustSequencer( sequencer, size ); 
    111  
    112                 sequencer.drainTo( new DemultiplexingSequencer.Drainer() { 
    113                         @Override 
    114                         public int available( final long startSequence, 
    115                                               final long sentinelSequence ) { 
    116                                 final long length = sentinelSequence - startSequence; 
    117                                 return ( int ) ( length + 1 ); 
    118                         } 
    119                 } ); 
    120         } 
    121  
    122  
    123         @Test 
    124         public void drainToMakesAvailableToReuseOnlyDrainedIndexes() throws Exception { 
    125                 final int size = 4; 
    126  
    127                 final DemultiplexingSequencer sequencer = createSequencer( size ); 
    128  
    129                 exhaustSequencer( sequencer, size ); 
    130  
    131                 final int toProcess = 2; 
    132  
    133                 sequencer.drainTo( new DemultiplexingSequencer.Drainer() { 
    134                         @Override 
    135                         public int available( final long startSequence, 
    136                                               final long sentinelSequence ) { 
    137                                 return toProcess; 
    138                         } 
    139                 } ); 
    140  
    141                 assertExactIndexesAvailable( sequencer, toProcess ); 
    142         } 
    143  
    144         @Test 
    145         public void exceptionInProcessorJustStopsProcessingAndChangeNothing() throws Exception { 
    146                 final int size = 4; 
    147  
    148                 final DemultiplexingSequencer sequencer = createSequencer( size ); 
    149  
    150                 exhaustSequencer( sequencer, size ); 
    151  
    152                 try { 
    153                         sequencer.drainTo( new DemultiplexingSequencer.Drainer() { 
    154                                 @Override 
    155                                 public int available( final long startSequence, 
    156                                                       final long sentinelSequence ) { 
    157                                         throw new RuntimeException(); 
    158                                 } 
    159                         } ); 
    160                         fail( "Exception was suppressed!" ); 
    161                 } catch( RuntimeException e ) { 
    162                 } 
    163  
    164                 assertExactIndexesAvailable( sequencer, 0 ); 
    165         } 
    166  
    167         private void exhaustSequencer( final DemultiplexingSequencer sequencer, 
    168                                        final int size ) { 
    169                 for( int i = 0; i < size; i++ ) { 
    170                         sequencer.claim(); 
    171                 } 
    172         } 
    173  
    174         private static DemultiplexingSequencer createSequencer( final int size ) { 
    175                 return new DemultiplexingSequencer( size ); 
    176         } 
    177  
    178  
    179         private static void assertExactIndexesAvailable( final DemultiplexingSequencer sequencer, 
    180                                                          final int expectedAvailable ) { 
    181                 for( int i = 0; i < expectedAvailable; i++ ) { 
    182                         assertNotEquals( i + " from " + expectedAvailable + " expected", 
    183                                          INVALID_INDEX, 
    184                                          sequencer.claim() 
    185                         ); 
    186                 } 
    187                 assertEquals( 
    188                                 "More entries, then expected(" + expectedAvailable + ")", 
    189                                 INVALID_INDEX, 
    190                                 sequencer.claim() 
    191                 ); 
     7public class MCSDSequencerTest extends SequencerTestBase { 
     8        @Override 
     9        protected Sequencer createSequencer( final int size ) { 
     10                return new MCSDSequencer( size ); 
    19211        } 
    19312} 
Note: See TracChangeset for help on using the changeset viewer.