- Timestamp:
- 09/24/12 19:41:35 (10 years ago)
- Location:
- Tests/JAVA/test/src/main/java/test/threads/demultiplexor
- Files:
-
- 1 deleted
- 1 edited
- 2 copied
Legend:
- Unmodified
- Added
- Removed
-
Tests/JAVA/test/src/main/java/test/threads/demultiplexor/DemultiplexorBenchmark.java
r565 r567 141 141 } 142 142 143 private static final class Entry implements CommittableEntry {143 private static final class Entry implements CommittableEntry, MarkableEntry { 144 144 private static final int DONE = 1; 145 145 private static final int NOT_DONE = 0; 146 146 147 private static final AtomicIntegerFieldUpdater<Entry> doneUpdater = newUpdater(148 " done"147 private static final AtomicIntegerFieldUpdater<Entry> markUpdater = newUpdater( 148 "mark" 149 149 ); 150 150 151 151 public long value; 152 private volatile int done;152 private volatile int mark; 153 153 154 154 public void reset() { 155 doneUpdater.lazySet( this,NOT_DONE );155 markWith( NOT_DONE ); 156 156 } 157 157 158 158 public void commit() { 159 doneUpdater.lazySet( this,DONE );159 markWith( DONE ); 160 160 } 161 161 162 162 public boolean isCommitted() { 163 return done == DONE; 163 return mark == DONE; 164 } 165 166 @Override 167 public void markWith( final int mark ) { 168 markUpdater.lazySet( this, mark ); 169 } 170 171 @Override 172 public boolean casMark( final int expectedMark, 173 final int newMark ) { 174 return markUpdater.compareAndSet( this, expectedMark, newMark ); 175 } 176 177 @Override 178 public int mark() { 179 return mark; 164 180 } 165 181 … … 170 186 } 171 187 }; 188 172 189 173 190 private static final AtomicIntegerFieldUpdater<Entry> newUpdater( final String fieldName ) { -
Tests/JAVA/test/src/main/java/test/threads/demultiplexor/MarkableEntry.java
r565 r567 5 5 * created 15.09.12 at 14:28 6 6 */ 7 public interface CommittableEntry {8 public void commit();7 public interface MarkableEntry { 8 public void markWith( final int mark ); 9 9 10 public boolean isCommitted(); 10 public boolean casMark( final int expectedMark, 11 final int newMark ); 11 12 12 public void reset(); 13 public int mark(); 14 13 15 } -
Tests/JAVA/test/src/main/java/test/threads/demultiplexor/SoftDemultiplexorBuffer.java
r565 r567 13 13 * created 15.09.12 at 1:24 14 14 */ 15 public class DemultiplexorBuffer<T extends CommittableEntry> implements Demultiplexor<T> {15 public class SoftDemultiplexorBuffer<T extends MarkableEntry> implements Demultiplexor<T> { 16 16 17 private static final AtomicLongFieldUpdater<DemultiplexorBuffer> headUpdater = newUpdater( 17 private static final int FREE = 0; 18 private static final int CLAIMED = 1; 19 private static final int FILLED = 2; 20 21 private static final AtomicLongFieldUpdater<SoftDemultiplexorBuffer> headUpdater = newUpdater( 18 22 "headCursor" 19 23 ); 20 24 21 private static final AtomicLongFieldUpdater<DemultiplexorBuffer> tailUpdater = newUpdater(22 "tailCursor"23 );24 25 25 26 private final T[] buffer; 26 27 27 28 private final int mask; 28 /** 29 * Elements range: [headCursor, tailCursor) 30 * <p/> 31 * (tailCursor - headCursor) == elements count 32 * <p/> 33 * 0 <= (tailCursor - headCursor) <= length => state invariant 34 * <p/> 35 * tailCursor - headCursor == length => buffer is full 36 * tailCursor - headCursor == 0 => buffer is empty 37 * <p/> 38 * (headCursor % size ) is the index of first item in buffer 39 * (tailCursor % size ) is the index of _cell_ for _next last item_ 40 */ 29 41 30 private volatile long headCursor = 0; 42 31 43 private volatile long tailCursor = 0;44 32 45 public DemultiplexorBuffer( final int size,46 final EventFactory<T> factory ) {33 public SoftDemultiplexorBuffer( final int size, 34 final EventFactory<T> factory ) { 47 35 checkArgument( size > 0, "size(%s) must be > 0", size ); 48 36 checkArgument( factory != null, "factory can't be null" ); … … 66 54 final int length = buffer.length; 67 55 while( true ) { 68 final long head = headCursor; 69 final long tail = tailCursor; 70 final long elements = tail - head; 71 if( elements >= length ) { 72 //we're full now -- wait 73 } else if( tailUpdater.compareAndSet( this, tail, tail + 1 ) ) { 74 final int index = index( tail ); 56 final long barrier = headCursor + length; 57 for( long seq = headCursor; seq < barrier; seq++ ) { 58 final int index = index( seq ); 75 59 final T entry = buffer[index]; 76 publisher.fill( entry, arg ); 77 entry.commit(); 78 return; 79 } else { 80 //once more try 60 if( entry.mark() == FREE ) { 61 if( entry.casMark( FREE, CLAIMED ) ) { 62 publisher.fill( entry, arg ); 63 entry.markWith( FILLED ); 64 //TODO store seq for future? 65 return; 66 } 67 } 81 68 } 82 69 } … … 86 73 public void drainTo( final EntryProcessor<T> processor ) { 87 74 checkArgument( processor != null, "processor can't be null" ); 88 final long tail = tailCursor;89 75 long seq = headCursor; 90 76 try { 91 for(; seq < tail; seq++ ) {77 for(; ; seq++ ) { 92 78 final int index = index( seq ); 93 79 final T record = buffer[index]; 94 if( !record.isCommitted() ) { 95 return; 80 final int mark = record.mark(); 81 switch( mark ) { 82 case FREE: 83 case CLAIMED: { 84 return; 85 } 86 case FILLED: { 87 final boolean processed = processor.process( record ); 88 if( !processed ) { 89 return; 90 } else { 91 record.markWith( FREE ); 92 } 93 } 94 96 95 } 97 final boolean processed = processor.process( record );98 if( !processed ) {99 return;100 }101 record.reset();102 96 } 103 97 } finally { … … 106 100 } 107 101 108 private int index( final long tail) {109 return ( int ) ( tail& mask );102 private int index( final long sequence ) { 103 return ( int ) ( sequence & mask ); 110 104 } 111 105 112 private static final AtomicLongFieldUpdater< DemultiplexorBuffer> newUpdater( final String fieldName ) {106 private static final AtomicLongFieldUpdater<SoftDemultiplexorBuffer> newUpdater( final String fieldName ) { 113 107 if( Boolean.getBoolean( "concurrent.use-unsafe" ) ) { 114 108 return UnsafeAtomicLongFieldUpdater.newUpdater( 115 DemultiplexorBuffer.class,109 SoftDemultiplexorBuffer.class, 116 110 fieldName 117 111 ); 118 112 } else { 119 113 return AtomicLongFieldUpdater.newUpdater( 120 DemultiplexorBuffer.class,114 SoftDemultiplexorBuffer.class, 121 115 fieldName 122 116 );
Note: See TracChangeset
for help on using the changeset viewer.