Changeset 575 for Tests


Ignore:
Timestamp:
01/31/13 20:31:25 (9 years ago)
Author:
BegemoT
Message:
 
File:
1 edited

Legend:

Unmodified
Added
Removed
  • Tests/JAVA/test/src/main/java/test/threads/experiment2/Main.java

    r574 r575  
    44import java.util.concurrent.Executors; 
    55 
     6import com.affinity.*; 
    67import com.google.common.base.Function; 
    78import com.google.common.base.Functions; 
     
    2425 
    2526        private static final int RUNS = 16; 
    26         private static final int TURNS = 1000; 
    27         private static final int MESSAGES = 1 << 16; 
    28         private static final int REPOSITORY_SIZE = 1 << 16; 
    29  
    30         public static void main( String[] args ) { 
     27        private static final int TURNS = 64; 
     28 
     29        private static final int MESSAGES = 1 << 14; 
     30 
     31        private static final int REPOSITORY_SIZE = 1 << 15; 
     32        private static final int SCAN_STEP = 64; 
     33        private static final ThreadAffinityService AFFINITY_SERVICE = ThreadAffinityUtils.defaultAffinityService(); 
     34        private static final CPULayoutService LAYOUT_SERVICE = ThreadAffinityUtils.defaultLayoutService(); 
     35 
     36        public static void main( final String[] args ) { 
     37 
     38 
    3139                final Message[] messages = messages( MESSAGES, REPOSITORY_SIZE ); 
    3240                final double[] repositoryA = new double[REPOSITORY_SIZE]; 
     
    3846                final TaskB b = new TaskB( repositoryB ); 
    3947 
    40                 final Function<Message, Message> syncF = composeSync( a, b ); 
    41  
    42                 final BenchmarkResult[] results = benchmark( syncF, messages, TURNS, RUNS ); 
     48                final Function<Message, Message> function = composeSync( a, b ); 
     49 
     50                final BenchmarkResult[] results = benchmark( function, messages, TURNS, RUNS ); 
    4351 
    4452                for( final BenchmarkResult result : results ) { 
     
    5058                        ); 
    5159                } 
    52         } 
    53  
    54         private static Function<Message, Message> composeSync( final TaskA a, 
    55                                                                final TaskB b ) { 
    56                 return Functions.compose( b, a ); 
    5760        } 
    5861 
     
    6871                return messages; 
    6972        } 
    70  
    7173 
    7274        private static BenchmarkResult[] benchmark( final Function<Message, Message> function, 
     
    9193        } 
    9294 
     95 
    9396        private static void benchmarkTurn( final Message[] messages, 
    9497                                           final Function<Message, Message> f ) { 
     
    98101        } 
    99102 
     103        private static Function<Message, Message> composeSync( final TaskA a, 
     104                                                               final TaskB b ) { 
     105                return Functions.compose( b, a ); 
     106        } 
     107 
    100108 
    101109        private static Function<Message, Message> composeAsync( final Function<Message, Message> b, 
    102110                                                                final Function<Message, Message> a ) { 
     111                attachToCore( 0 ); 
    103112                final Disruptor<Message> disruptor = new Disruptor<Message>( 
    104113                                new EventFactory<Message>() { 
     
    112121                                new BusySpinWaitStrategy() 
    113122                ); 
    114                 disruptor.handleEventsWith( new EventHandler<Message>() { 
    115                         @Override 
    116                         public void onEvent( final Message event, 
    117                                              final long sequence, 
    118                                              final boolean endOfBatch ) throws Exception { 
    119                                 b.apply( event ); 
    120                         } 
    121                 } ); 
     123                disruptor.handleEventsWith( new MessageEventHandler( b ) ); 
    122124                disruptor.start(); 
    123125 
     
    145147                @Override 
    146148                public Message apply( final Message message ) { 
    147                         repository[message.from] -= message.amount; 
     149                        final double amount = -message.amount; 
     150                        final int offset = message.to; 
     151                        putStressOnMemory( repository, offset, amount ); 
    148152                        return message; 
    149153                } 
     154 
    150155        } 
    151156 
    152157        public static class TaskB implements Function<Message, Message> { 
     158 
    153159                private final double[] repository; 
    154160 
     
    159165                @Override 
    160166                public Message apply( final Message message ) { 
    161                         repository[message.from] += message.amount; 
     167                        final int offset = message.from; 
     168                        final double amount = message.amount; 
     169                        putStressOnMemory( repository, offset, amount ); 
    162170                        return message; 
     171                } 
     172 
     173        } 
     174 
     175        private static void putStressOnMemory( final double[] repository, 
     176                                               final int offset, 
     177                                               final double amount ) { 
     178                for( int i = 0; i < repository.length; i += SCAN_STEP ) { 
     179                        final int index = ( i + offset ) % repository.length; 
     180                        repository[index] += amount; 
    163181                } 
    164182        } 
     
    186204                } 
    187205        } 
     206 
     207        private static class MessageEventHandler implements EventHandler<Message>, LifecycleAware { 
     208                private final Function<Message, Message> b; 
     209 
     210                public MessageEventHandler( final Function<Message, Message> b ) { 
     211                        this.b = b; 
     212                } 
     213 
     214                @Override 
     215                public void onEvent( final Message event, 
     216                                     final long sequence, 
     217                                     final boolean endOfBatch ) throws Exception { 
     218                        b.apply( event ); 
     219                } 
     220 
     221                @Override 
     222                public void onStart() { 
     223                        attachToCore( 1 ); 
     224                } 
     225 
     226                @Override 
     227                public void onShutdown() { 
     228 
     229                } 
     230        } 
     231 
     232        private static void attachToCore( final int cpuNo ) { 
     233                if( AFFINITY_SERVICE.isActuallyAvailable() ) { 
     234                        final CPU cpu = LAYOUT_SERVICE.cpu( cpuNo ); 
     235                        AFFINITY_SERVICE.restrictCurrentThreadTo( cpu ); 
     236                } else { 
     237                        System.out.println( "Affinity binding is not available here" ); 
     238                } 
     239        } 
    188240} 
Note: See TracChangeset for help on using the changeset viewer.