- Timestamp:
- 02/02/13 12:27:11 (9 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
Tests/JAVA/test/src/main/java/test/threads/experiment2/Main.java
r575 r576 10 10 import com.lmax.disruptor.dsl.Disruptor; 11 11 12 import static com.google.common.base.Preconditions.checkElementIndex; 12 13 import static test.threads.queue.stressed.ITask.BenchmarkResult; 13 14 14 15 /** 15 * IDEA:16 * use common cyclic buffer with small capacity17 * in single-threaded case just go around,18 * in multithreaded use lzaySet cursors19 *20 16 * @author ruslan 21 17 * created 27.01.13 at 23:00 22 18 */ 23 19 public class Main { 24 25 26 private static final int RUNS = 16; 27 private static final int TURNS = 64; 28 29 private static final int MESSAGES = 1 << 14; 30 31 private static final int REPOSITORY_SIZE = 1 << 15; 32 private static final int SCAN_STEP = 64; 20 private static final boolean PREDICABLE_SCAN = Boolean.getBoolean( "predictable" ); 21 22 private static final int RUNS = Integer.getInteger( "runs", 16 ); 23 private static final int TURNS = Integer.getInteger( "turns", 512 ); 24 25 private static final int MESSAGES = 1 << 8; 26 27 private static final int REPOSITORY_SIZE = Integer.getInteger( "repository-size", 1 << 16 ); 28 private static final int READ_PER_MESSAGE = 128; 29 private static final int READ_STEP = 1025; 30 33 31 private static final ThreadAffinityService AFFINITY_SERVICE = ThreadAffinityUtils.defaultAffinityService(); 34 32 private static final CPULayoutService LAYOUT_SERVICE = ThreadAffinityUtils.defaultLayoutService(); 35 33 36 34 public static void main( final String[] args ) { 35 System.out.printf( 36 "Repo: %d, reads: %d/msg, step: %d %s\n" + 37 "%d runs by %d turns by %d messages each\n", 38 REPOSITORY_SIZE, READ_PER_MESSAGE, READ_STEP, 39 PREDICABLE_SCAN ? "predictable" : "random", 40 RUNS, TURNS, MESSAGES 41 ); 37 42 38 43 … … 43 48 Arrays.fill( repositoryB, 1.0 ); 44 49 45 final TaskA a = new TaskA( repositoryA ); 46 final TaskB b = new TaskB( repositoryB ); 50 51 final Phase a = new Phase( repositoryA ); 52 final Phase b = new Phase( repositoryB ); 47 53 48 54 final Function<Message, Message> function = composeSync( a, b ); 49 55 50 final BenchmarkResult[] results = benchmark( function, messages, TURNS, RUNS ); 56 final BenchmarkResult[] results = benchmark( 57 function, 58 messages, 59 TURNS, 60 RUNS 61 ); 51 62 52 63 for( final BenchmarkResult result : results ) { … … 63 74 final int repositorySize ) { 64 75 final Message[] messages = new Message[size]; 76 final int step = REPOSITORY_SIZE / size; 65 77 for( int i = 0; i < messages.length; i++ ) { 66 messages[i] = new Message(); 67 messages[i].from = i % repositorySize; 68 messages[i].to = ( size - 1 - i ) & repositorySize; 69 messages[i].amount = i / 2 - size; 78 final Message message = new Message(); 79 80 final int index = i * step % repositorySize; 81 message.index = index; 82 83 checkElementIndex( message.index, repositorySize ); 84 85 message.amount = i / 2 - size; 86 87 messages[i] = message; 70 88 } 71 89 return messages; 72 90 } 91 92 private static void putStressOnMemory( final double[] repository, 93 final int offset, 94 final double amount ) { 95 if( PREDICABLE_SCAN ) { 96 for( int i = 0; i < READ_PER_MESSAGE; i++ ) { 97 final int index = ( i * READ_STEP + offset ) % repository.length; 98 repository[index] += amount; 99 } 100 } else { 101 int index = offset; 102 for( int i = 0; i < READ_PER_MESSAGE; i++ ) { 103 index = Math.abs( index * offset ) % repository.length; 104 repository[index] += amount; 105 } 106 } 107 } 108 73 109 74 110 private static BenchmarkResult[] benchmark( final Function<Message, Message> function, … … 77 113 final int runs ) { 78 114 final BenchmarkResult[] results = new BenchmarkResult[runs]; 79 for( int j = 0; j < runs; j++ ) {115 for( int run = 0; run < runs; run++ ) { 80 116 final long startedAt = System.nanoTime(); 81 for( int i = 0; i < turns; i++ ) {117 for( int turn = 0; turn < turns; turn++ ) { 82 118 benchmarkTurn( messages, function ); 83 119 } 84 120 final long finishedAt = System.nanoTime(); 85 121 86 results[ j] = new BenchmarkResult(122 results[run] = new BenchmarkResult( 87 123 -1, 88 124 finishedAt - startedAt, … … 93 129 } 94 130 95 96 131 private static void benchmarkTurn( final Message[] messages, 97 132 final Function<Message, Message> f ) { 98 133 for( final Message message : messages ) { 134 message.index++; 99 135 f.apply( message ); 100 136 } 101 137 } 102 138 103 private static Function<Message, Message> composeSync( final TaskA a, 104 final TaskB b ) { 139 140 private static Function<Message, Message> composeSync( final Function<Message, Message> a, 141 final Function<Message, Message> b ) { 105 142 return Functions.compose( b, a ); 106 143 } … … 118 155 }, 119 156 Executors.newSingleThreadExecutor(), 120 new SingleThreadedClaimStrategy( 32),157 new SingleThreadedClaimStrategy( 16 ), 121 158 new BusySpinWaitStrategy() 122 159 ); … … 137 174 } 138 175 139 140 public static class TaskA implements Function<Message, Message> { 176 public static class Phase implements Function<Message, Message> { 177 141 178 private final double[] repository; 142 179 143 public TaskA( final double[] repository ) {180 public Phase( final double[] repository ) { 144 181 this.repository = repository; 145 182 } … … 147 184 @Override 148 185 public Message apply( final Message message ) { 149 final double amount = -message.amount;150 final int offset = message. to;186 final double amount = message.amount; 187 final int offset = message.index; 151 188 putStressOnMemory( repository, offset, amount ); 189 //actually, message just pass through 152 190 return message; 153 191 } 154 192 155 } 156 157 public static class TaskB implements Function<Message, Message> { 158 159 private final double[] repository; 160 161 public TaskB( final double[] repository ) { 162 this.repository = repository; 163 } 164 165 @Override 166 public Message apply( final Message message ) { 167 final int offset = message.from; 168 final double amount = message.amount; 169 putStressOnMemory( repository, offset, amount ); 170 return message; 171 } 172 173 } 174 175 private static void putStressOnMemory( final double[] repository, 176 final int offset, 177 final double amount ) { 178 for( int i = 0; i < repository.length; i += SCAN_STEP ) { 179 final int index = ( i + offset ) % repository.length; 180 repository[index] += amount; 181 } 193 182 194 } 183 195 184 196 private static class Message { 185 public int from; 186 public int to; 197 public int index; 187 198 public double amount; 188 199 } … … 198 209 public Message translateTo( final Message event, 199 210 final long sequence ) { 200 event.from = message.from; 201 event.to = message.to; 211 event.index = message.index; 202 212 event.amount = message.amount; 203 213 return event;
Note: See TracChangeset
for help on using the changeset viewer.