Changeset 570 for Tests


Ignore:
Timestamp:
11/14/12 05:25:13 (10 years ago)
Author:
BegemoT
Message:
 
Location:
Tests/JAVA/test/src/main/java
Files:
8 added
1 edited

Legend:

Unmodified
Added
Removed
  • Tests/JAVA/test/src/main/java/com/lmax/disruptor/UnpaddedSingleThreadedClaimStrategy.java

    r541 r570  
    1919 
    2020import com.lmax.disruptor.util.MutableLong; 
    21 import com.lmax.disruptor.util.PaddedLong; 
    2221 
    2322import static com.lmax.disruptor.util.Util.getMinimumSequence; 
     
    2524/** 
    2625 * Optimised strategy can be used when there is a single publisher thread claiming sequences. 
    27  * 
     26 * <p/> 
    2827 * This strategy must <b>not</b> be used when multiple threads are used for publishing concurrently on the same {@link com.lmax.disruptor.Sequencer} 
    2928 */ 
    3029public final class UnpaddedSingleThreadedClaimStrategy 
    31     implements ClaimStrategy 
    32 { 
    33     private final int bufferSize; 
    34     private final MutableLong minGatingSequence = new MutableLong(Sequencer.INITIAL_CURSOR_VALUE); 
    35     private final MutableLong claimSequence = new MutableLong(Sequencer.INITIAL_CURSOR_VALUE); 
     30                implements ClaimStrategy { 
     31        private final int bufferSize; 
     32        private final MutableLong minGatingSequence = new MutableLong( Sequencer.INITIAL_CURSOR_VALUE ); 
     33        private final MutableLong claimSequence = new MutableLong( Sequencer.INITIAL_CURSOR_VALUE ); 
    3634 
    37     /** 
    38      * Construct a new single threaded publisher {@link com.lmax.disruptor.ClaimStrategy} for a given buffer size. 
    39      * 
    40      * @param bufferSize for the underlying data structure. 
    41      */ 
    42     public UnpaddedSingleThreadedClaimStrategy( final int bufferSize ) 
    43     { 
    44         this.bufferSize = bufferSize; 
    45     } 
     35        /** 
     36         * Construct a new single threaded publisher {@link com.lmax.disruptor.ClaimStrategy} for a given buffer size. 
     37         * 
     38         * @param bufferSize for the underlying data structure. 
     39         */ 
     40        public UnpaddedSingleThreadedClaimStrategy( final int bufferSize ) { 
     41                this.bufferSize = bufferSize; 
     42        } 
    4643 
    47     @Override 
    48     public int getBufferSize() 
    49     { 
    50         return bufferSize; 
    51     } 
     44        @Override 
     45        public int getBufferSize() { 
     46                return bufferSize; 
     47        } 
    5248 
    53     @Override 
    54     public long getSequence() 
    55     { 
    56         return claimSequence.get(); 
    57     } 
     49        @Override 
     50        public long getSequence() { 
     51                return claimSequence.get(); 
     52        } 
    5853 
    59     @Override 
    60     public boolean hasAvailableCapacity(final int availableCapacity, final Sequence[] dependentSequences) 
    61     { 
    62         final long wrapPoint = (claimSequence.get() + availableCapacity) - bufferSize; 
    63         if (wrapPoint > minGatingSequence.get()) 
    64         { 
    65             long minSequence = getMinimumSequence(dependentSequences); 
    66             minGatingSequence.set(minSequence); 
     54        @Override 
     55        public boolean hasAvailableCapacity( final int availableCapacity, final Sequence[] dependentSequences ) { 
     56                final long wrapPoint = ( claimSequence.get() + availableCapacity ) - bufferSize; 
     57                if( wrapPoint > minGatingSequence.get() ) { 
     58                        long minSequence = getMinimumSequence( dependentSequences ); 
     59                        minGatingSequence.set( minSequence ); 
    6760 
    68             if (wrapPoint > minSequence) 
    69             { 
    70                 return false; 
    71             } 
    72         } 
     61                        if( wrapPoint > minSequence ) { 
     62                                return false; 
     63                        } 
     64                } 
    7365 
    74         return true; 
    75     } 
     66                return true; 
     67        } 
    7668 
    77     @Override 
    78     public long incrementAndGet(final Sequence[] dependentSequences) 
    79     { 
    80         long nextSequence = claimSequence.get() + 1L; 
    81         claimSequence.set(nextSequence); 
    82         waitForFreeSlotAt(nextSequence, dependentSequences); 
     69        @Override 
     70        public long incrementAndGet( final Sequence[] dependentSequences ) { 
     71                long nextSequence = claimSequence.get() + 1L; 
     72                claimSequence.set( nextSequence ); 
     73                waitForFreeSlotAt( nextSequence, dependentSequences ); 
    8374 
    84         return nextSequence; 
    85     } 
     75                return nextSequence; 
     76        } 
    8677 
    87     @Override 
    88     public long incrementAndGet(final int delta, final Sequence[] dependentSequences) 
    89     { 
    90         long nextSequence = claimSequence.get() + delta; 
    91         claimSequence.set(nextSequence); 
    92         waitForFreeSlotAt(nextSequence, dependentSequences); 
     78        @Override 
     79        public long incrementAndGet( final int delta, final Sequence[] dependentSequences ) { 
     80                long nextSequence = claimSequence.get() + delta; 
     81                claimSequence.set( nextSequence ); 
     82                waitForFreeSlotAt( nextSequence, dependentSequences ); 
    9383 
    94         return nextSequence; 
    95     } 
     84                return nextSequence; 
     85        } 
    9686 
    97     @Override 
    98     public void setSequence(final long sequence, final Sequence[] dependentSequences) 
    99     { 
    100         claimSequence.set(sequence); 
    101         waitForFreeSlotAt(sequence, dependentSequences); 
    102     } 
     87        @Override 
     88        public void setSequence( final long sequence, final Sequence[] dependentSequences ) { 
     89                claimSequence.set( sequence ); 
     90                waitForFreeSlotAt( sequence, dependentSequences ); 
     91        } 
    10392 
    104     @Override 
    105     public void serialisePublishing(final long sequence, final Sequence cursor, final int batchSize) 
    106     { 
    107         cursor.set(sequence); 
    108     } 
     93        @Override 
     94        public void serialisePublishing( final long sequence, final Sequence cursor, final int batchSize ) { 
     95                cursor.set( sequence ); 
     96        } 
    10997 
    110     private void waitForFreeSlotAt(final long sequence, final Sequence[] dependentSequences) 
    111     { 
    112         final long wrapPoint = sequence - bufferSize; 
    113         if (wrapPoint > minGatingSequence.get()) 
    114         { 
    115             long minSequence; 
    116             while (wrapPoint > (minSequence = getMinimumSequence(dependentSequences))) 
    117             { 
    118                 LockSupport.parkNanos(1L); 
    119             } 
     98        private void waitForFreeSlotAt( final long sequence, final Sequence[] dependentSequences ) { 
     99                final long wrapPoint = sequence - bufferSize; 
     100                if( wrapPoint > minGatingSequence.get() ) { 
     101                        long minSequence; 
     102                        while( wrapPoint > ( minSequence = getMinimumSequence( dependentSequences ) ) ) { 
     103                                LockSupport.parkNanos( 1L ); 
     104                        } 
    120105 
    121             minGatingSequence.set(minSequence); 
    122         } 
    123     } 
     106                        minGatingSequence.set( minSequence ); 
     107                } 
     108        } 
     109 
     110        @Override 
     111        public long checkAndIncrement( final int availableCapacity, 
     112                                       final int delta, 
     113                                       final Sequence[] gatingSequences ) throws InsufficientCapacityException { 
     114                throw new UnsupportedOperationException( "Method not implemented yet" ); 
     115        } 
    124116} 
Note: See TracChangeset for help on using the changeset viewer.