Changeset 567 for Tests


Ignore:
Timestamp:
09/24/12 19:41:35 (10 years ago)
Author:
BegemoT
Message:
 
Location:
Tests/JAVA/test/src/main/java/test/threads/demultiplexor
Files:
1 deleted
1 edited
2 copied

Legend:

Unmodified
Added
Removed
  • Tests/JAVA/test/src/main/java/test/threads/demultiplexor/DemultiplexorBenchmark.java

    r565 r567  
    141141        } 
    142142 
    143         private static final class Entry implements CommittableEntry { 
     143        private static final class Entry implements CommittableEntry, MarkableEntry { 
    144144                private static final int DONE = 1; 
    145145                private static final int NOT_DONE = 0; 
    146146 
    147                 private static final AtomicIntegerFieldUpdater<Entry> doneUpdater = newUpdater( 
    148                                 "done" 
     147                private static final AtomicIntegerFieldUpdater<Entry> markUpdater = newUpdater( 
     148                                "mark" 
    149149                ); 
    150150 
    151151                public long value; 
    152                 private volatile int done; 
     152                private volatile int mark; 
    153153 
    154154                public void reset() { 
    155                         doneUpdater.lazySet( this, NOT_DONE ); 
     155                        markWith( NOT_DONE ); 
    156156                } 
    157157 
    158158                public void commit() { 
    159                         doneUpdater.lazySet( this, DONE ); 
     159                        markWith( DONE ); 
    160160                } 
    161161 
    162162                public boolean isCommitted() { 
    163                         return done == DONE; 
     163                        return mark == DONE; 
     164                } 
     165 
     166                @Override 
     167                public void markWith( final int mark ) { 
     168                        markUpdater.lazySet( this, mark ); 
     169                } 
     170 
     171                @Override 
     172                public boolean casMark( final int expectedMark, 
     173                                        final int newMark ) { 
     174                        return markUpdater.compareAndSet( this, expectedMark, newMark ); 
     175                } 
     176 
     177                @Override 
     178                public int mark() { 
     179                        return mark; 
    164180                } 
    165181 
     
    170186                        } 
    171187                }; 
     188 
    172189 
    173190                private static final AtomicIntegerFieldUpdater<Entry> newUpdater( final String fieldName ) { 
  • Tests/JAVA/test/src/main/java/test/threads/demultiplexor/MarkableEntry.java

    r565 r567  
    55 *         created 15.09.12 at 14:28 
    66 */ 
    7 public interface CommittableEntry { 
    8         public void commit(); 
     7public interface MarkableEntry { 
     8        public void markWith( final int mark ); 
    99 
    10         public boolean isCommitted(); 
     10        public boolean casMark( final int expectedMark, 
     11                                final int newMark ); 
    1112 
    12         public void reset(); 
     13        public int mark(); 
     14 
    1315} 
  • Tests/JAVA/test/src/main/java/test/threads/demultiplexor/SoftDemultiplexorBuffer.java

    r565 r567  
    1313 *         created 15.09.12 at 1:24 
    1414 */ 
    15 public class DemultiplexorBuffer<T extends CommittableEntry> implements Demultiplexor<T> { 
     15public class SoftDemultiplexorBuffer<T extends MarkableEntry> implements Demultiplexor<T> { 
    1616 
    17         private static final AtomicLongFieldUpdater<DemultiplexorBuffer> headUpdater = newUpdater( 
     17        private static final int FREE = 0; 
     18        private static final int CLAIMED = 1; 
     19        private static final int FILLED = 2; 
     20 
     21        private static final AtomicLongFieldUpdater<SoftDemultiplexorBuffer> headUpdater = newUpdater( 
    1822                        "headCursor" 
    1923        ); 
    2024 
    21         private static final AtomicLongFieldUpdater<DemultiplexorBuffer> tailUpdater = newUpdater( 
    22                         "tailCursor" 
    23         ); 
    2425 
    2526        private final T[] buffer; 
    2627 
    2728        private final int mask; 
    28         /** 
    29          * Elements range: [headCursor, tailCursor) 
    30          * <p/> 
    31          * (tailCursor - headCursor) == elements count 
    32          * <p/> 
    33          * 0 <= (tailCursor - headCursor) <= length  => state invariant 
    34          * <p/> 
    35          * tailCursor - headCursor == length         => buffer is full 
    36          * tailCursor - headCursor == 0              => buffer is empty 
    37          * <p/> 
    38          * (headCursor % size ) is the index of first item in buffer 
    39          * (tailCursor % size ) is the index of _cell_ for _next last item_ 
    40          */ 
     29 
    4130        private volatile long headCursor = 0; 
    4231 
    43         private volatile long tailCursor = 0; 
    4432 
    45         public DemultiplexorBuffer( final int size, 
    46                                     final EventFactory<T> factory ) { 
     33        public SoftDemultiplexorBuffer( final int size, 
     34                                        final EventFactory<T> factory ) { 
    4735                checkArgument( size > 0, "size(%s) must be > 0", size ); 
    4836                checkArgument( factory != null, "factory can't be null" ); 
     
    6654                final int length = buffer.length; 
    6755                while( true ) { 
    68                         final long head = headCursor; 
    69                         final long tail = tailCursor; 
    70                         final long elements = tail - head; 
    71                         if( elements >= length ) { 
    72                                 //we're full now -- wait 
    73                         } else if( tailUpdater.compareAndSet( this, tail, tail + 1 ) ) { 
    74                                 final int index = index( tail ); 
     56                        final long barrier = headCursor + length; 
     57                        for( long seq = headCursor; seq < barrier; seq++ ) { 
     58                                final int index = index( seq ); 
    7559                                final T entry = buffer[index]; 
    76                                 publisher.fill( entry, arg ); 
    77                                 entry.commit(); 
    78                                 return; 
    79                         } else { 
    80                                 //once more try 
     60                                if( entry.mark() == FREE ) { 
     61                                        if( entry.casMark( FREE, CLAIMED ) ) { 
     62                                                publisher.fill( entry, arg ); 
     63                                                entry.markWith( FILLED ); 
     64                                                //TODO store seq for future? 
     65                                                return; 
     66                                        } 
     67                                } 
    8168                        } 
    8269                } 
     
    8673        public void drainTo( final EntryProcessor<T> processor ) { 
    8774                checkArgument( processor != null, "processor can't be null" ); 
    88                 final long tail = tailCursor; 
    8975                long seq = headCursor; 
    9076                try { 
    91                         for(; seq < tail; seq++ ) { 
     77                        for(; ; seq++ ) { 
    9278                                final int index = index( seq ); 
    9379                                final T record = buffer[index]; 
    94                                 if( !record.isCommitted() ) { 
    95                                         return; 
     80                                final int mark = record.mark(); 
     81                                switch( mark ) { 
     82                                        case FREE: 
     83                                        case CLAIMED: { 
     84                                                return; 
     85                                        } 
     86                                        case FILLED: { 
     87                                                final boolean processed = processor.process( record ); 
     88                                                if( !processed ) { 
     89                                                        return; 
     90                                                } else { 
     91                                                        record.markWith( FREE ); 
     92                                                } 
     93                                        } 
     94 
    9695                                } 
    97                                 final boolean processed = processor.process( record ); 
    98                                 if( !processed ) { 
    99                                         return; 
    100                                 } 
    101                                 record.reset(); 
    10296                        } 
    10397                } finally { 
     
    106100        } 
    107101 
    108         private int index( final long tail ) { 
    109                 return ( int ) ( tail & mask ); 
     102        private int index( final long sequence ) { 
     103                return ( int ) ( sequence & mask ); 
    110104        } 
    111105 
    112         private static final AtomicLongFieldUpdater<DemultiplexorBuffer> newUpdater( final String fieldName ) { 
     106        private static final AtomicLongFieldUpdater<SoftDemultiplexorBuffer> newUpdater( final String fieldName ) { 
    113107                if( Boolean.getBoolean( "concurrent.use-unsafe" ) ) { 
    114108                        return UnsafeAtomicLongFieldUpdater.newUpdater( 
    115                                         DemultiplexorBuffer.class, 
     109                                        SoftDemultiplexorBuffer.class, 
    116110                                        fieldName 
    117111                        ); 
    118112                } else { 
    119113                        return AtomicLongFieldUpdater.newUpdater( 
    120                                         DemultiplexorBuffer.class, 
     114                                        SoftDemultiplexorBuffer.class, 
    121115                                        fieldName 
    122116                        ); 
Note: See TracChangeset for help on using the changeset viewer.