Changeset 576 for Tests


Ignore:
Timestamp:
02/02/13 12:27:11 (9 years ago)
Author:
BegemoT
Message:
 
File:
1 edited

Legend:

Unmodified
Added
Removed
  • Tests/JAVA/test/src/main/java/test/threads/experiment2/Main.java

    r575 r576  
    1010import com.lmax.disruptor.dsl.Disruptor; 
    1111 
     12import static com.google.common.base.Preconditions.checkElementIndex; 
    1213import static test.threads.queue.stressed.ITask.BenchmarkResult; 
    1314 
    1415/** 
    15  * IDEA: 
    16  * use common cyclic buffer with small capacity 
    17  * in single-threaded case just go around, 
    18  * in multithreaded use lzaySet cursors 
    19  * 
    2016 * @author ruslan 
    2117 *         created 27.01.13 at 23:00 
    2218 */ 
    2319public class Main { 
    24  
    25  
    26         private static final int RUNS = 16; 
    27         private static final int TURNS = 64; 
    28  
    29         private static final int MESSAGES = 1 << 14; 
    30  
    31         private static final int REPOSITORY_SIZE = 1 << 15; 
    32         private static final int SCAN_STEP = 64; 
     20        private static final boolean PREDICABLE_SCAN = Boolean.getBoolean( "predictable" ); 
     21 
     22        private static final int RUNS = Integer.getInteger( "runs", 16 ); 
     23        private static final int TURNS = Integer.getInteger( "turns", 512 ); 
     24 
     25        private static final int MESSAGES = 1 << 8; 
     26 
     27        private static final int REPOSITORY_SIZE = Integer.getInteger( "repository-size", 1 << 16 ); 
     28        private static final int READ_PER_MESSAGE = 128; 
     29        private static final int READ_STEP = 1025; 
     30 
    3331        private static final ThreadAffinityService AFFINITY_SERVICE = ThreadAffinityUtils.defaultAffinityService(); 
    3432        private static final CPULayoutService LAYOUT_SERVICE = ThreadAffinityUtils.defaultLayoutService(); 
    3533 
    3634        public static void main( final String[] args ) { 
     35                System.out.printf( 
     36                                "Repo: %d, reads: %d/msg, step: %d %s\n" + 
     37                                                "%d runs by %d turns by %d messages each\n", 
     38                                REPOSITORY_SIZE, READ_PER_MESSAGE, READ_STEP, 
     39                                PREDICABLE_SCAN ? "predictable" : "random", 
     40                                RUNS, TURNS, MESSAGES 
     41                ); 
    3742 
    3843 
     
    4348                Arrays.fill( repositoryB, 1.0 ); 
    4449 
    45                 final TaskA a = new TaskA( repositoryA ); 
    46                 final TaskB b = new TaskB( repositoryB ); 
     50 
     51                final Phase a = new Phase( repositoryA ); 
     52                final Phase b = new Phase( repositoryB ); 
    4753 
    4854                final Function<Message, Message> function = composeSync( a, b ); 
    4955 
    50                 final BenchmarkResult[] results = benchmark( function, messages, TURNS, RUNS ); 
     56                final BenchmarkResult[] results = benchmark( 
     57                                function, 
     58                                messages, 
     59                                TURNS, 
     60                                RUNS 
     61                ); 
    5162 
    5263                for( final BenchmarkResult result : results ) { 
     
    6374                                           final int repositorySize ) { 
    6475                final Message[] messages = new Message[size]; 
     76                final int step = REPOSITORY_SIZE / size; 
    6577                for( int i = 0; i < messages.length; i++ ) { 
    66                         messages[i] = new Message(); 
    67                         messages[i].from = i % repositorySize; 
    68                         messages[i].to = ( size - 1 - i ) & repositorySize; 
    69                         messages[i].amount = i / 2 - size; 
     78                        final Message message = new Message(); 
     79 
     80                        final int index = i * step % repositorySize; 
     81                        message.index = index; 
     82 
     83                        checkElementIndex( message.index, repositorySize ); 
     84 
     85                        message.amount = i / 2 - size; 
     86 
     87                        messages[i] = message; 
    7088                } 
    7189                return messages; 
    7290        } 
     91 
     92        private static void putStressOnMemory( final double[] repository, 
     93                                               final int offset, 
     94                                               final double amount ) { 
     95                if( PREDICABLE_SCAN ) { 
     96                        for( int i = 0; i < READ_PER_MESSAGE; i++ ) { 
     97                                final int index = ( i * READ_STEP + offset ) % repository.length; 
     98                                repository[index] += amount; 
     99                        } 
     100                } else { 
     101                        int index = offset; 
     102                        for( int i = 0; i < READ_PER_MESSAGE; i++ ) { 
     103                                index = Math.abs( index * offset ) % repository.length; 
     104                                repository[index] += amount; 
     105                        } 
     106                } 
     107        } 
     108 
    73109 
    74110        private static BenchmarkResult[] benchmark( final Function<Message, Message> function, 
     
    77113                                                    final int runs ) { 
    78114                final BenchmarkResult[] results = new BenchmarkResult[runs]; 
    79                 for( int j = 0; j < runs; j++ ) { 
     115                for( int run = 0; run < runs; run++ ) { 
    80116                        final long startedAt = System.nanoTime(); 
    81                         for( int i = 0; i < turns; i++ ) { 
     117                        for( int turn = 0; turn < turns; turn++ ) { 
    82118                                benchmarkTurn( messages, function ); 
    83119                        } 
    84120                        final long finishedAt = System.nanoTime(); 
    85121 
    86                         results[j] = new BenchmarkResult( 
     122                        results[run] = new BenchmarkResult( 
    87123                                        -1, 
    88124                                        finishedAt - startedAt, 
     
    93129        } 
    94130 
    95  
    96131        private static void benchmarkTurn( final Message[] messages, 
    97132                                           final Function<Message, Message> f ) { 
    98133                for( final Message message : messages ) { 
     134                        message.index++; 
    99135                        f.apply( message ); 
    100136                } 
    101137        } 
    102138 
    103         private static Function<Message, Message> composeSync( final TaskA a, 
    104                                                                final TaskB b ) { 
     139 
     140        private static Function<Message, Message> composeSync( final Function<Message, Message> a, 
     141                                                               final Function<Message, Message> b ) { 
    105142                return Functions.compose( b, a ); 
    106143        } 
     
    118155                                }, 
    119156                                Executors.newSingleThreadExecutor(), 
    120                                 new SingleThreadedClaimStrategy( 32 ), 
     157                                new SingleThreadedClaimStrategy( 16 ), 
    121158                                new BusySpinWaitStrategy() 
    122159                ); 
     
    137174        } 
    138175 
    139  
    140         public static class TaskA implements Function<Message, Message> { 
     176        public static class Phase implements Function<Message, Message> { 
     177 
    141178                private final double[] repository; 
    142179 
    143                 public TaskA( final double[] repository ) { 
     180                public Phase( final double[] repository ) { 
    144181                        this.repository = repository; 
    145182                } 
     
    147184                @Override 
    148185                public Message apply( final Message message ) { 
    149                         final double amount = -message.amount; 
    150                         final int offset = message.to; 
     186                        final double amount = message.amount; 
     187                        final int offset = message.index; 
    151188                        putStressOnMemory( repository, offset, amount ); 
     189                        //actually, message just pass through 
    152190                        return message; 
    153191                } 
    154192 
    155         } 
    156  
    157         public static class TaskB implements Function<Message, Message> { 
    158  
    159                 private final double[] repository; 
    160  
    161                 public TaskB( final double[] repository ) { 
    162                         this.repository = repository; 
    163                 } 
    164  
    165                 @Override 
    166                 public Message apply( final Message message ) { 
    167                         final int offset = message.from; 
    168                         final double amount = message.amount; 
    169                         putStressOnMemory( repository, offset, amount ); 
    170                         return message; 
    171                 } 
    172  
    173         } 
    174  
    175         private static void putStressOnMemory( final double[] repository, 
    176                                                final int offset, 
    177                                                final double amount ) { 
    178                 for( int i = 0; i < repository.length; i += SCAN_STEP ) { 
    179                         final int index = ( i + offset ) % repository.length; 
    180                         repository[index] += amount; 
    181                 } 
     193 
    182194        } 
    183195 
    184196        private static class Message { 
    185                 public int from; 
    186                 public int to; 
     197                public int index; 
    187198                public double amount; 
    188199        } 
     
    198209                public Message translateTo( final Message event, 
    199210                                            final long sequence ) { 
    200                         event.from = message.from; 
    201                         event.to = message.to; 
     211                        event.index = message.index; 
    202212                        event.amount = message.amount; 
    203213                        return event; 
Note: See TracChangeset for help on using the changeset viewer.