Ignore:
Timestamp:
09/16/12 16:51:53 (8 years ago)
Author:
BegemoT
Message:
 
Location:
Tests/JAVA/test/src/main/java/test/threads/queue/stressed
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • Tests/JAVA/test/src/main/java/test/threads/queue/stressed/ABQxNTask.java

    r498 r563  
    11package test.threads.queue.stressed; 
    22 
    3 import java.util.concurrent.*; 
     3import java.util.concurrent.ArrayBlockingQueue; 
     4import java.util.concurrent.BlockingQueue; 
     5import java.util.concurrent.Executor; 
    46 
    57import com.google.common.base.Throwables; 
     
    810import org.apache.commons.logging.LogFactory; 
    911import test.helpers.Config; 
     12import test.threads.queue.common.BaseEnqueuer; 
    1013 
    1114/** 
     
    1619 */ 
    1720public class ABQxNTask<E> extends AbstractTask<E> { 
    18     private static final Log log = LogFactory.getLog( ABQxNTask.class ); 
     21        private static final Log log = LogFactory.getLog( ABQxNTask.class ); 
    1922 
    20     private final int nodes; 
     23        private final int nodes; 
    2124 
    22     public ABQxNTask( final Config config ) { 
    23         super( config ); 
    24         nodes = config.getAsInt( "pipeline.nodes" ); 
    25     } 
     25        public ABQxNTask( final Config config ) { 
     26                super( config ); 
     27                nodes = config.getAsInt( "pipeline.nodes" ); 
     28        } 
    2629 
    2730 
    28     @Override 
    29     public int workersRequired() { 
    30         return nodes; 
    31     } 
     31        @Override 
     32        public int workersRequired() { 
     33                return nodes; 
     34        } 
    3235 
    33     private IUsefulWorkEmulator<E> lastEmulator; 
    34     private BlockingQueue<E> outbox; 
    35     private PassThroughNode<E>[] tasks; 
     36        private IUsefulWorkEmulator<E> lastEmulator; 
     37        private BlockingQueue<E> outbox; 
     38        private PassThroughNode<E>[] tasks; 
    3639 
    37     @Override 
    38     protected EventEnqueuer _initialize( final Executor threadPool, 
    39                                          final IUsefulWorkEmulator<E>[] emulators, 
    40                                          final EventFactory<E> eventFactory ) { 
    41         final BlockingQueue<E> inbox = createQueue(); 
     40        @Override 
     41        protected BaseEnqueuer[] initializeEnqueuers( final Executor threadPool, 
     42                                                      final IUsefulWorkEmulator<E>[] emulators, 
     43                                                      final EventFactory<E> eventFactory ) { 
     44                final BlockingQueue<E> inbox = createQueue(); 
    4245 
    4346 
    44         BlockingQueue<E> _outbox = inbox; 
    45         tasks = new PassThroughNode[nodes - 1]; 
    46         for ( int i = 0; i < tasks.length; i++ ) { 
    47             final BlockingQueue<E> _inbox = _outbox; 
    48             _outbox = createQueue(); 
     47                BlockingQueue<E> _outbox = inbox; 
     48                tasks = new PassThroughNode[nodes - 1]; 
     49                for( int i = 0; i < tasks.length; i++ ) { 
     50                        final BlockingQueue<E> _inbox = _outbox; 
     51                        _outbox = createQueue(); 
    4952 
    50             final String name = String.valueOf( i ); 
    51             final PassThroughNode<E> task = new PassThroughNode<E>( 
    52                     name, 
    53                     _inbox, 
    54                     emulators[i], 
    55                     _outbox 
    56             ); 
     53                        final String name = String.valueOf( i ); 
     54                        final PassThroughNode<E> task = new PassThroughNode<E>( 
     55                                        name, 
     56                                        _inbox, 
     57                                        emulators[i], 
     58                                        _outbox 
     59                        ); 
    5760 
    58             tasks[i] = task; 
    59             threadPool.execute( task ); 
    60         } 
     61                        tasks[i] = task; 
     62                        threadPool.execute( task ); 
     63                } 
    6164 
    62         //this is last task data 
    63         this.outbox = _outbox; 
    64         lastEmulator = emulators[nodes - 1]; 
     65                //this is last task data 
     66                this.outbox = _outbox; 
     67                lastEmulator = emulators[nodes - 1]; 
    6568 
    66         return new Enqueuer( eventsInBatch(), eventFactory, inbox ); 
    67     } 
     69                return new BaseEnqueuer[] { new Enqueuer( eventsInBatch(), eventFactory, inbox ) }; 
     70        } 
    6871 
    69     @Override 
    70     public void terminate() { 
    71         super.terminate(); 
    72         for ( final PassThroughNode<E> task : tasks ) { 
    73             task.terminate(); 
    74         } 
    75     } 
     72        @Override 
     73        public void terminate() { 
     74                super.terminate(); 
     75                for( final PassThroughNode<E> task : tasks ) { 
     76                        task.terminate(); 
     77                } 
     78        } 
    7679 
    77     private ArrayBlockingQueue<E> createQueue() { 
    78         return new ArrayBlockingQueue<E>( bufferSize() ); 
    79     } 
     80        private ArrayBlockingQueue<E> createQueue() { 
     81                return new ArrayBlockingQueue<E>( bufferSize() ); 
     82        } 
    8083 
    81     @Override 
    82     protected long dequeueBatchEvents( final int eventsInBatch ) throws Exception { 
    83         for ( int i = 0; i < eventsInBatch; i++ ) { 
    84             final E event = outbox.take(); 
    85             lastEmulator.spendCPUTime( event ); 
    86         } 
     84        @Override 
     85        protected long dequeueBatchEvents( final int eventsInBatch ) throws Exception { 
     86                for( int i = 0; i < eventsInBatch; i++ ) { 
     87                        final E event = outbox.take(); 
     88                        lastEmulator.spendCPUTime( event ); 
     89                } 
    8790 
    88         return 0; 
    89     } 
     91                return 0; 
     92        } 
    9093 
    91     private static class PassThroughNode<E> implements Runnable { 
    92         protected final String name; 
    93         protected final IUsefulWorkEmulator<E> emulator; 
    94         protected final BlockingQueue<E> inbox; 
    95         protected final BlockingQueue<E> outbox; 
     94        private static class PassThroughNode<E> implements Runnable { 
     95                protected final String name; 
     96                protected final IUsefulWorkEmulator<E> emulator; 
     97                protected final BlockingQueue<E> inbox; 
     98                protected final BlockingQueue<E> outbox; 
    9699 
    97         public PassThroughNode( final String name, 
    98                                 final BlockingQueue<E> inbox, 
    99                                 final IUsefulWorkEmulator<E> emulator, 
    100                                 final BlockingQueue<E> outbox ) { 
    101             this.name = name; 
    102             this.emulator = emulator; 
    103             this.inbox = inbox; 
    104             this.outbox = outbox; 
    105         } 
     100                public PassThroughNode( final String name, 
     101                                        final BlockingQueue<E> inbox, 
     102                                        final IUsefulWorkEmulator<E> emulator, 
     103                                        final BlockingQueue<E> outbox ) { 
     104                        this.name = name; 
     105                        this.emulator = emulator; 
     106                        this.inbox = inbox; 
     107                        this.outbox = outbox; 
     108                } 
    106109 
    107         private volatile Thread currentThread; 
     110                private volatile Thread currentThread; 
    108111 
    109         @Override 
    110         public void run() { 
    111             try { 
    112                 currentThread = Thread.currentThread(); 
    113                 while ( true ) { 
    114                     makeStep(); 
    115                 } 
    116             } catch ( InterruptedException e ) { 
    117                 log.info( "PassThroughNode[" + name + "] interrupted" ); 
    118             } catch ( Throwable t ) { 
    119                 log.error( "PassThroughNode[" + name + "] exited unexpectedly", t ); 
    120                 throw Throwables.propagate( t ); 
    121             } 
    122         } 
     112                @Override 
     113                public void run() { 
     114                        try { 
     115                                currentThread = Thread.currentThread(); 
     116                                while( true ) { 
     117                                        makeStep(); 
     118                                } 
     119                        } catch( InterruptedException e ) { 
     120                                log.info( "PassThroughNode[" + name + "] interrupted" ); 
     121                        } catch( Throwable t ) { 
     122                                log.error( "PassThroughNode[" + name + "] exited unexpectedly", t ); 
     123                                throw Throwables.propagate( t ); 
     124                        } 
     125                } 
    123126 
    124         protected void makeStep() throws InterruptedException { 
    125             final E taken = inbox.take(); 
    126             emulator.spendCPUTime( taken ); 
    127             outbox.put( taken ); 
    128         } 
     127                protected void makeStep() throws InterruptedException { 
     128                        final E taken = inbox.take(); 
     129                        emulator.spendCPUTime( taken ); 
     130                        outbox.put( taken ); 
     131                } 
    129132 
    130         public void terminate() { 
    131             currentThread.interrupt(); 
    132         } 
     133                public void terminate() { 
     134                        currentThread.interrupt(); 
     135                } 
    133136 
    134         @Override 
    135         public String toString() { 
    136             return name; 
    137         } 
    138     } 
     137                @Override 
     138                public String toString() { 
     139                        return name; 
     140                } 
     141        } 
    139142 
    140     private class Enqueuer extends EventEnqueuer<E> { 
    141         private final BlockingQueue<E> inbox; 
     143        private class Enqueuer extends EventEnqueuer<E> { 
     144                private final BlockingQueue<E> inbox; 
    142145 
    143         private Enqueuer( final int eventsInBatch, 
    144                           final EventFactory<E> factory, 
    145                           final BlockingQueue<E> inbox ) { 
    146             super( eventsInBatch, factory ); 
    147             this.inbox = inbox; 
    148         } 
     146                private Enqueuer( final int eventsInBatch, 
     147                                  final EventFactory<E> factory, 
     148                                  final BlockingQueue<E> inbox ) { 
     149                        super( eventsInBatch, factory ); 
     150                        this.inbox = inbox; 
     151                } 
    149152 
    150         @Override 
    151         protected void enqueueEventsBatch( final int eventsInBatch, 
    152                                            final EventFactory<E> factory ) throws Exception { 
    153             for ( int i = 0; i < eventsInBatch; i++ ) { 
    154                 inbox.put( factory.newInstance() ); 
    155             } 
    156         } 
    157     } 
     153                @Override 
     154                protected void enqueueEventsBatch( final int eventsInBatch, 
     155                                                   final EventFactory<E> factory ) throws Exception { 
     156                        for( int i = 0; i < eventsInBatch; i++ ) { 
     157                                inbox.put( factory.newInstance() ); 
     158                        } 
     159                } 
     160        } 
    158161} 
  • Tests/JAVA/test/src/main/java/test/threads/queue/stressed/AbstractTask.java

    r523 r563  
    1919 */ 
    2020public abstract class AbstractTask<E> implements ITask<E> { 
    21     private static final Log log = LogFactory.getLog( AbstractTask.class ); 
    22  
    23     /** 5 секунд должно быть достаточно для всех! */ 
    24     private static final long MAX_WAIT_TIME = 5000L; 
    25  
    26     private final int bufferSize; 
    27     private final int eventsInBatch; 
    28  
    29     protected AbstractTask() { 
    30         this( TaskBenchmark.QUEUE_CAPACITY, TaskBenchmark.EVENTS_IN_BATCH ); 
    31     } 
    32  
    33     protected AbstractTask( final int bufferSize ) { 
    34         this( bufferSize, TaskBenchmark.EVENTS_IN_BATCH ); 
    35     } 
    36  
    37     protected AbstractTask( final Config config ) { 
    38         this( 
    39                 config.getAsInt( "task.buffer-size", TaskBenchmark.QUEUE_CAPACITY ), 
    40                 config.getAsInt( "task.events-in-batch", TaskBenchmark.EVENTS_IN_BATCH ) 
    41         ); 
    42     } 
    43  
    44     protected AbstractTask( final int bufferSize, 
    45                             final int eventsInBatch ) { 
    46         checkArgument( 
    47                 eventsInBatch < bufferSize, 
    48                 "eventsInBatch(%s) must be < bufferSize(%s)", 
    49                 eventsInBatch, bufferSize 
    50         ); 
    51         this.bufferSize = bufferSize; 
    52         this.eventsInBatch = eventsInBatch; 
    53     } 
    54  
    55     private boolean initialized = false; 
    56     private EventFactory<E> eventFactory = null; 
    57     private BaseEnqueuer eventEnqueuer = null; 
    58     private Executor threadPool = null; 
    59  
    60     private boolean executing = false; 
    61  
    62     @Override 
    63     public synchronized final void initialize( final Executor threadPool, 
    64                                                final IUsefulWorkEmulator<E>[] emulators, 
    65                                                final EventFactory<E> factory ) { 
    66         checkArgument( threadPool != null, "threadPool can't be null" ); 
    67         checkArgument( emulators != null, "emulators can't be null" ); 
    68         checkArgument( emulators.length > 0, "emulators.length(%s) can't be 0", emulators.length ); 
    69  
    70         checkState( !initialized, "Can't initialize more then once" ); 
    71  
    72         this.eventFactory = factory; 
    73         this.threadPool = threadPool; 
    74         try { 
    75             eventEnqueuer = _initialize( threadPool, emulators, factory ); 
    76             checkState( eventEnqueuer != null, "eventEnqueuer can't be null" ); 
    77             threadPool.execute( eventEnqueuer ); 
    78         } catch ( Exception e ) { 
    79             throw Throwables.propagate( e ); 
    80         } 
    81         initialized = true; 
    82     } 
    83  
    84     public int bufferSize() { 
    85         return bufferSize; 
    86     } 
    87  
    88     public int eventsInBatch() { 
    89         return eventsInBatch; 
    90     } 
    91  
    92     public synchronized boolean isInitialized() { 
    93         return initialized; 
    94     } 
    95  
    96     public synchronized EventFactory<E> eventFactory() { 
    97         checkState( initialized, "Task is not initialized" ); 
    98         return eventFactory; 
    99     } 
    100  
    101     protected abstract BaseEnqueuer _initialize( final Executor threadPool, 
    102                                                  final IUsefulWorkEmulator<E>[] emulators, 
    103                                                  final EventFactory<E> factory ) throws Exception; 
    104  
    105     @Override 
    106     public final BenchmarkResult[] execute( final long millisToBogus, 
    107                                             final long millisToMeasure, 
    108                                             final int measurements ) { 
    109         checkState( initialized, "Can't execute uninitialized task" ); 
    110         checkState( !executing, "Previous execution not finished yet" ); 
    111         checkState( eventEnqueuer != null, "eventEnqueuer can't be null" ); 
    112         checkArgument( measurements >= 1, "measurements(%s) can't be <1", measurements ); 
    113         executing = true; 
    114         try { 
    115             return executeImpl( millisToBogus, millisToMeasure, measurements ); 
    116         } catch ( Exception e ) { 
    117             throw Throwables.propagate( e ); 
    118         } finally { 
    119             executing = false; 
    120         } 
    121     } 
    122  
    123     @Override 
    124     public void terminate() { 
    125         checkState( initialized, "Can't terminate uninitialized task" ); 
    126         checkState( eventEnqueuer != null, "eventEnqueuer can't be null" ); 
    127         eventEnqueuer.terminate(); 
    128     } 
    129  
    130     protected BenchmarkResult[] executeImpl( final long millisToBogus, 
    131                                              final long millisToMeasure, 
    132                                              final int measurements ) throws Exception { 
    133         eventEnqueuer.waitForInitialized(); 
    134  
    135         eventEnqueuer.start(); 
    136         eventEnqueuer.waitForStarting(); 
    137  
    138         //dequeue some packets before start timer to warm/fill up pipeline (not JIT!) 
    139         final BenchmarkResult bogusResult = dequeueWhileTimeIsNotEllapsed( millisToBogus ); 
    140  
    141         final long msToOneMeasure = millisToMeasure / measurements; 
    142         checkState( 
    143                 msToOneMeasure > 1, 
    144                 "msToMeasure(%s) = millisToMeasure(%s)/measurements(%s) must be > 1", 
    145                 msToOneMeasure, millisToMeasure, measurements 
    146         ); 
    147         final BenchmarkResult[] results = new BenchmarkResult[measurements]; 
    148         long packetsCompleted = 0; 
    149         for ( int i = 0; i < results.length; i++ ) { 
    150             results[i] = dequeueWhileTimeIsNotEllapsed( msToOneMeasure ); 
    151             packetsCompleted += results[i].packetsCompleted; 
    152         } 
    153  
    154         long eventsDequeued = bogusResult.packetsCompleted + packetsCompleted; 
    155  
    156         eventEnqueuer.pause(); 
    157         //wait for packets exhausted 
    158         Thread.yield(); 
    159         //здесь периодически возникает ситуация, когда мы подвисаем. Понять причины точно не удается 
    160         //но чтобы не подвешивать эксперименты я здесь просто ввел ограничение времени 
    161         final long startedAt = System.currentTimeMillis(); 
    162         while ( !eventEnqueuer.is( State.WAITING ) ) { 
    163             while ( eventEnqueuer.eventsEnqueued() > eventsDequeued ) { 
    164 //            log.info( "Waiting for " + eventEnqueuer.eventsEnqueued() + " > " + eventsDequeued ); 
    165                 dequeueBatchEvents( eventsInBatch ); 
    166                 eventsDequeued += eventsInBatch; 
    167             } 
    168             Thread.yield(); 
    169             final long current = System.currentTimeMillis(); 
    170             if ( current - startedAt > MAX_WAIT_TIME ) { 
    171                 //идите в жопу, я -- заканчиваю 
    172                 break; 
    173             } 
    174         } 
    175         return results; 
    176     } 
    177  
    178     private BenchmarkResult dequeueWhileTimeIsNotEllapsed( final long timeLimit ) throws Exception { 
    179         long eventsDequeued = 0; 
    180         final long startedAt = System.currentTimeMillis(); 
    181         final long startedAtNs = System.nanoTime(); 
    182         while ( true ) { 
    183             dequeueBatchEvents( eventsInBatch ); 
    184             eventsDequeued += eventsInBatch; 
    185  
    186             final long finishedAtMs = System.currentTimeMillis(); 
    187             final long ellapsedMs = finishedAtMs - startedAt; 
    188             if ( ellapsedMs >= timeLimit ) { 
    189                 final long finishedAtNs = System.nanoTime(); 
    190                 final long ellapsedNs = finishedAtNs - startedAtNs; 
    191  
    192                 final BenchmarkResult result = new BenchmarkResult( 
    193                         ellapsedMs, 
    194                         ellapsedNs, 
    195                         eventsDequeued 
    196                 ); 
    197                 return result; 
    198             } 
    199         } 
    200     } 
    201  
    202     protected abstract long dequeueBatchEvents( final int eventsInBatch ) throws Exception; 
    203  
    204     @Override 
    205     public String toString() { 
    206         return String.format( 
    207                 "%s[workers:%d][buffer:%d]", 
    208                 getClass().getSimpleName(), 
    209                 workersRequired(), 
    210                 bufferSize() 
    211         ); 
    212     } 
    213  
    214     protected abstract static class EventEnqueuer<E> extends BaseEnqueuer<E> { 
    215  
    216         private final EventFactory<E> eventFactory; 
    217  
    218  
    219         public EventEnqueuer( final int eventsInBatch, 
    220                               final EventFactory<E> eventFactory ) { 
    221             super( eventsInBatch ); 
    222             this.eventFactory = eventFactory; 
    223         } 
    224  
    225         @Override 
    226         protected void enqueueEventsBatch( final int eventsInBatch ) throws Exception { 
    227             enqueueEventsBatch( eventsInBatch, eventFactory ); 
    228         } 
    229  
    230         protected abstract void enqueueEventsBatch( final int eventsInBatch, 
    231                                                     final EventFactory<E> factory ) throws Exception; 
    232  
    233     } 
     21        private static final Log log = LogFactory.getLog( AbstractTask.class ); 
     22 
     23        /** 5 секунд должно быть достаточно для всех! */ 
     24        private static final long MAX_WAIT_TIME = 5000L; 
     25 
     26        private final int bufferSize; 
     27        private final int eventsInBatch; 
     28 
     29        protected AbstractTask() { 
     30                this( TaskBenchmark.QUEUE_CAPACITY, TaskBenchmark.EVENTS_IN_BATCH ); 
     31        } 
     32 
     33        protected AbstractTask( final int bufferSize ) { 
     34                this( bufferSize, TaskBenchmark.EVENTS_IN_BATCH ); 
     35        } 
     36 
     37        protected AbstractTask( final Config config ) { 
     38                this( 
     39                                config.getAsInt( "task.buffer-size", TaskBenchmark.QUEUE_CAPACITY ), 
     40                                config.getAsInt( "task.events-in-batch", TaskBenchmark.EVENTS_IN_BATCH ) 
     41                ); 
     42        } 
     43 
     44        protected AbstractTask( final int bufferSize, 
     45                                final int eventsInBatch ) { 
     46                checkArgument( 
     47                                eventsInBatch < bufferSize, 
     48                                "eventsInBatch(%s) must be < bufferSize(%s)", 
     49                                eventsInBatch, bufferSize 
     50                ); 
     51                this.bufferSize = bufferSize; 
     52                this.eventsInBatch = eventsInBatch; 
     53        } 
     54 
     55        private boolean initialized = false; 
     56        private EventFactory<E> eventFactory = null; 
     57        private BaseEnqueuer[] eventEnqueuers = null; 
     58        private Executor threadPool = null; 
     59 
     60        private boolean executing = false; 
     61 
     62        @Override 
     63        public synchronized final void initialize( final Executor threadPool, 
     64                                                   final IUsefulWorkEmulator<E>[] emulators, 
     65                                                   final EventFactory<E> factory ) { 
     66                checkArgument( threadPool != null, "threadPool can't be null" ); 
     67                checkArgument( emulators != null, "emulators can't be null" ); 
     68                checkArgument( emulators.length > 0, "emulators.length(%s) can't be 0", emulators.length ); 
     69 
     70                checkState( !initialized, "Can't initialize more then once" ); 
     71 
     72                this.eventFactory = factory; 
     73                this.threadPool = threadPool; 
     74                try { 
     75                        eventEnqueuers = initializeEnqueuers( threadPool, emulators, factory ); 
     76                        checkState( eventEnqueuers != null, "eventEnqueuers can't be null" ); 
     77                        for( final BaseEnqueuer eventEnqueuer : eventEnqueuers ) { 
     78                                checkState( eventEnqueuer != null, "eventEnqueuers[i] can't be null" ); 
     79                                threadPool.execute( eventEnqueuer ); 
     80                        } 
     81                } catch( Exception e ) { 
     82                        throw Throwables.propagate( e ); 
     83                } 
     84                initialized = true; 
     85        } 
     86 
     87        public int bufferSize() { 
     88                return bufferSize; 
     89        } 
     90 
     91        public int eventsInBatch() { 
     92                return eventsInBatch; 
     93        } 
     94 
     95        public synchronized boolean isInitialized() { 
     96                return initialized; 
     97        } 
     98 
     99        public synchronized EventFactory<E> eventFactory() { 
     100                checkState( initialized, "Task is not initialized" ); 
     101                return eventFactory; 
     102        } 
     103 
     104        protected abstract BaseEnqueuer[] initializeEnqueuers( final Executor threadPool, 
     105                                                               final IUsefulWorkEmulator<E>[] emulators, 
     106                                                               final EventFactory<E> factory ) throws Exception; 
     107 
     108        @Override 
     109        public final BenchmarkResult[] execute( final long millisToBogus, 
     110                                                final long millisToMeasure, 
     111                                                final int measurements ) { 
     112                checkState( initialized, "Can't execute uninitialized task" ); 
     113                checkState( !executing, "Previous execution not finished yet" ); 
     114                checkState( eventEnqueuers != null, "eventEnqueuer can't be null" ); 
     115                checkArgument( measurements >= 1, "measurements(%s) can't be <1", measurements ); 
     116                executing = true; 
     117                try { 
     118                        return executeImpl( millisToBogus, millisToMeasure, measurements ); 
     119                } catch( Exception e ) { 
     120                        throw Throwables.propagate( e ); 
     121                } finally { 
     122                        executing = false; 
     123                } 
     124        } 
     125 
     126        @Override 
     127        public void terminate() { 
     128                checkState( initialized, "Can't terminate uninitialized task" ); 
     129                checkState( eventEnqueuers != null, "eventEnqueuer can't be null" ); 
     130                for( final BaseEnqueuer eventEnqueuer : eventEnqueuers ) { 
     131                        eventEnqueuer.terminate(); 
     132                } 
     133        } 
     134 
     135        protected BenchmarkResult[] executeImpl( final long millisToBogus, 
     136                                                 final long millisToMeasure, 
     137                                                 final int measurements ) throws Exception { 
     138                for( final BaseEnqueuer eventEnqueuer : eventEnqueuers ) { 
     139                        eventEnqueuer.waitForInitialized(); 
     140                } 
     141 
     142                for( final BaseEnqueuer eventEnqueuer : eventEnqueuers ) { 
     143                        eventEnqueuer.start(); 
     144                } 
     145 
     146                for( final BaseEnqueuer eventEnqueuer : eventEnqueuers ) { 
     147                        eventEnqueuer.waitForStarting(); 
     148                } 
     149 
     150                //dequeue some packets before start timer to warm/fill up pipeline (not JIT!) 
     151                final BenchmarkResult bogusResult = dequeueWhileTimeIsNotElapsed( millisToBogus ); 
     152 
     153                final long msToOneMeasure = millisToMeasure / measurements; 
     154                checkState( 
     155                                msToOneMeasure > 1, 
     156                                "msToMeasure(%s) = millisToMeasure(%s)/measurements(%s) must be > 1", 
     157                                msToOneMeasure, millisToMeasure, measurements 
     158                ); 
     159                final BenchmarkResult[] results = new BenchmarkResult[measurements]; 
     160                long packetsCompleted = 0; 
     161                for( int i = 0; i < results.length; i++ ) { 
     162                        results[i] = dequeueWhileTimeIsNotElapsed( msToOneMeasure ); 
     163                        packetsCompleted += results[i].packetsCompleted; 
     164                } 
     165 
     166                long eventsDequeued = bogusResult.packetsCompleted + packetsCompleted; 
     167                for( final BaseEnqueuer eventEnqueuer : eventEnqueuers ) { 
     168                        eventEnqueuer.pause(); 
     169                } 
     170                //wait for packets exhausted 
     171                Thread.yield(); 
     172                //здесь периодически возникает ситуация, когда мы подвисаем. Понять причины точно не удается 
     173                //но чтобы не подвешивать эксперименты я здесь просто ввел ограничение времени 
     174                final long startedAt = System.currentTimeMillis(); 
     175                do { 
     176                        boolean allStopped = true; 
     177                        long totalEnqueued = 0; 
     178                        for( final BaseEnqueuer eventEnqueuer : eventEnqueuers ) { 
     179                                allStopped &= !eventEnqueuer.is( State.WAITING ); 
     180                                totalEnqueued += eventEnqueuer.eventsEnqueued(); 
     181                        } 
     182                        if( !allStopped ) { 
     183                        } else if( totalEnqueued > eventsDequeued ) { 
     184                                dequeueBatchEvents( eventsInBatch ); 
     185                                eventsDequeued += eventsInBatch; 
     186                        } else { 
     187                                break; 
     188                        } 
     189                        Thread.yield(); 
     190                        final long current = System.currentTimeMillis(); 
     191                        if( current - startedAt > MAX_WAIT_TIME ) { 
     192                                //идите в жопу, я -- заканчиваю 
     193                                break; 
     194                        } 
     195                } while( true ); 
     196                return results; 
     197        } 
     198 
     199        private BenchmarkResult dequeueWhileTimeIsNotElapsed( final long timeLimit ) throws Exception { 
     200                long eventsDequeued = 0; 
     201                final long startedAt = System.currentTimeMillis(); 
     202                final long startedAtNs = System.nanoTime(); 
     203                while( true ) { 
     204                        dequeueBatchEvents( eventsInBatch ); 
     205                        eventsDequeued += eventsInBatch; 
     206 
     207                        final long finishedAtMs = System.currentTimeMillis(); 
     208                        final long elapsedMs = finishedAtMs - startedAt; 
     209                        if( elapsedMs >= timeLimit ) { 
     210                                final long finishedAtNs = System.nanoTime(); 
     211                                final long elapsedNs = finishedAtNs - startedAtNs; 
     212 
     213                                final BenchmarkResult result = new BenchmarkResult( 
     214                                                elapsedMs, 
     215                                                elapsedNs, 
     216                                                eventsDequeued 
     217                                ); 
     218                                return result; 
     219                        } 
     220                } 
     221        } 
     222 
     223        protected abstract long dequeueBatchEvents( final int eventsInBatch ) throws Exception; 
     224 
     225        @Override 
     226        public String toString() { 
     227                return String.format( 
     228                                "%s[workers:%d][buffer:%d]", 
     229                                getClass().getSimpleName(), 
     230                                workersRequired(), 
     231                                bufferSize() 
     232                ); 
     233        } 
     234 
     235        protected abstract static class EventEnqueuer<E> extends BaseEnqueuer<E> { 
     236 
     237                private final EventFactory<E> eventFactory; 
     238 
     239 
     240                public EventEnqueuer( final int eventsInBatch, 
     241                                      final EventFactory<E> eventFactory ) { 
     242                        super( eventsInBatch ); 
     243                        this.eventFactory = eventFactory; 
     244                } 
     245 
     246                @Override 
     247                protected void enqueueEventsBatch( final int eventsInBatch ) throws Exception { 
     248                        enqueueEventsBatch( eventsInBatch, eventFactory ); 
     249                } 
     250 
     251                protected abstract void enqueueEventsBatch( final int eventsInBatch, 
     252                                                            final EventFactory<E> factory ) throws Exception; 
     253 
     254        } 
    234255 
    235256} 
  • Tests/JAVA/test/src/main/java/test/threads/queue/stressed/DxNTask.java

    r498 r563  
    11package test.threads.queue.stressed; 
    22 
    3 import java.util.List; 
     3import java.util.*; 
    44import java.util.concurrent.Executor; 
    55 
     
    99import org.apache.commons.logging.LogFactory; 
    1010import test.helpers.Config; 
     11import test.threads.queue.common.BaseEnqueuer; 
    1112 
    1213/** 
     
    1718 */ 
    1819public class DxNTask<E> extends AbstractTask<E> { 
    19     private static final Log log = LogFactory.getLog( DxNTask.class ); 
     20        private static final Log log = LogFactory.getLog( DxNTask.class ); 
    2021 
    21     private final WaitStrategy waitStrategy; 
    22     private final boolean publishInBatch; 
     22        private final WaitStrategy waitStrategy; 
     23        private final boolean publishInBatch; 
    2324 
    24     private final int nodes; 
     25        private final int nodes; 
    2526 
    26     public DxNTask( final Config config ) throws ClassNotFoundException, InstantiationException, IllegalAccessException { 
    27         super( config ); 
     27        public DxNTask( final Config config ) throws ClassNotFoundException, InstantiationException, IllegalAccessException { 
     28                super( config ); 
    2829 
    29         this.waitStrategy = config.newInstanceByClass( 
    30                 "disruptor.wait-strategy", 
    31                 new BusySpinWaitStrategy() 
    32         ); 
     30                this.waitStrategy = config.newInstanceByClass( 
     31                                "disruptor.wait-strategy", 
     32                                new BusySpinWaitStrategy() 
     33                ); 
    3334 
    34         this.nodes = config.getAsInt( "pipeline.nodes" ); 
    35         this.publishInBatch = config.getAsBoolean( "disruptor.publish-in-batch", false ); 
    36     } 
     35                this.nodes = config.getAsInt( "pipeline.nodes" ); 
     36                this.publishInBatch = config.getAsBoolean( "disruptor.publish-in-batch", false ); 
     37        } 
    3738 
    3839 
    39     private RingBuffer<E> ringBuffer; 
    40     private EventProcessor[] processors; 
     40        private RingBuffer<E> ringBuffer; 
     41        private EventProcessor[] processors; 
    4142 
    42     @Override 
    43     public int workersRequired() { 
    44         return nodes; 
    45     } 
     43        @Override 
     44        public int workersRequired() { 
     45                return nodes; 
     46        } 
    4647 
    47     private SequenceBarrier lastSequenceBarrier; 
    48     private final Sequence lastSequence = new Sequence( Sequencer.INITIAL_CURSOR_VALUE ); 
    49     private IUsefulWorkEmulator<E> lastEmulator; 
     48        private SequenceBarrier lastSequenceBarrier; 
     49        private final Sequence lastSequence = new Sequence( Sequencer.INITIAL_CURSOR_VALUE ); 
     50        private IUsefulWorkEmulator<E> lastEmulator; 
    5051 
    51     @Override 
    52     protected EventEnqueuer _initialize( final Executor threadPool, 
    53                                          final IUsefulWorkEmulator<E>[] emulators, 
    54                                          final EventFactory<E> eventFactory ) { 
    55         final ClaimStrategy claimStrategy = new SingleThreadedClaimStrategy( bufferSize() ); 
     52        @Override 
     53        protected BaseEnqueuer[] initializeEnqueuers( final Executor threadPool, 
     54                                                      final IUsefulWorkEmulator<E>[] emulators, 
     55                                                      final EventFactory<E> eventFactory ) { 
     56                final ClaimStrategy claimStrategy = new SingleThreadedClaimStrategy( bufferSize() ); 
    5657 
    57         ringBuffer = new RingBuffer<E>( 
    58                 eventFactory, 
    59                 claimStrategy, 
    60                 waitStrategy 
    61         ); 
     58                ringBuffer = new RingBuffer<E>( 
     59                                eventFactory, 
     60                                claimStrategy, 
     61                                waitStrategy 
     62                ); 
    6263 
    63         SequenceBarrier previousBarrier = ringBuffer.newBarrier(); 
    64         final List<EventProcessor> processors = Lists.newArrayList(); 
    65         for ( int i = 0; i < nodes - 1; i++ ) { 
    66             final IUsefulWorkEmulator<E> emulator = emulators[i]; 
    67             final PassThroughHandler<E> handler = new PassThroughHandler<E>( emulator ); 
    68             final BatchEventProcessor<E> processor = new BatchEventProcessor<E>( 
    69                     ringBuffer, 
    70                     previousBarrier, 
    71                     handler 
    72             ); 
    73             previousBarrier = ringBuffer.newBarrier( processor.getSequence() ); 
    74             processors.add( processor ); 
    75         } 
     64                SequenceBarrier previousBarrier = ringBuffer.newBarrier(); 
     65                final List<EventProcessor> processors = Lists.newArrayList(); 
     66                for( int i = 0; i < nodes - 1; i++ ) { 
     67                        final IUsefulWorkEmulator<E> emulator = emulators[i]; 
     68                        final PassThroughHandler<E> handler = new PassThroughHandler<E>( emulator ); 
     69                        final BatchEventProcessor<E> processor = new BatchEventProcessor<E>( 
     70                                        ringBuffer, 
     71                                        previousBarrier, 
     72                                        handler 
     73                        ); 
     74                        previousBarrier = ringBuffer.newBarrier( processor.getSequence() ); 
     75                        processors.add( processor ); 
     76                } 
    7677 
    77         this.processors = processors.toArray( new EventProcessor[0] ); 
     78                this.processors = processors.toArray( new EventProcessor[0] ); 
    7879 
    79         lastSequenceBarrier = previousBarrier; 
    80         lastEmulator = emulators[emulators.length - 1]; 
     80                lastSequenceBarrier = previousBarrier; 
     81                lastEmulator = emulators[emulators.length - 1]; 
    8182 
    82         ringBuffer.setGatingSequences( lastSequence ); 
     83                ringBuffer.setGatingSequences( lastSequence ); 
    8384 
    84         for ( final EventProcessor processor : processors ) { 
    85             threadPool.execute( processor ); 
    86         } 
     85                for( final EventProcessor processor : processors ) { 
     86                        threadPool.execute( processor ); 
     87                } 
    8788 
    88         return new Enqueuer( eventsInBatch(), eventFactory, ringBuffer ); 
    89     } 
     89                return new BaseEnqueuer[] { 
     90                                new Enqueuer( eventsInBatch(), eventFactory, ringBuffer ) 
     91                }; 
     92        } 
    9093 
    91     @Override 
    92     protected long dequeueBatchEvents( final int eventsInBatch ) throws Exception { 
    93         final long seq = lastSequence.get(); 
    94         final long maxSequence = seq + eventsInBatch; 
     94        @Override 
     95        protected long dequeueBatchEvents( final int eventsInBatch ) throws Exception { 
     96                final long seq = lastSequence.get(); 
     97                final long maxSequence = seq + eventsInBatch; 
    9598 
    9699 
    97         long nextSequence = lastSequence.get() + 1L; 
    98         while ( nextSequence <= maxSequence ) { 
    99             final long availableSequence = Math.min( 
    100                     lastSequenceBarrier.waitFor( nextSequence ), 
    101                     maxSequence 
    102             ); 
    103             while ( nextSequence <= availableSequence ) { 
    104                 final E event = ringBuffer.get( nextSequence ); 
    105                 lastEmulator.spendCPUTime( event ); 
     100                long nextSequence = lastSequence.get() + 1L; 
     101                while( nextSequence <= maxSequence ) { 
     102                        final long availableSequence = Math.min( 
     103                                        lastSequenceBarrier.waitFor( nextSequence ), 
     104                                        maxSequence 
     105                        ); 
     106                        while( nextSequence <= availableSequence ) { 
     107                                final E event = ringBuffer.get( nextSequence ); 
     108                                lastEmulator.spendCPUTime( event ); 
    106109 
    107                 nextSequence++; 
    108             } 
    109         } 
    110         lastSequence.set( nextSequence - 1L ); 
     110                                nextSequence++; 
     111                        } 
     112                } 
     113                lastSequence.set( nextSequence - 1L ); 
    111114 
    112         return 0; 
    113     } 
     115                return 0; 
     116        } 
    114117 
    115118 
    116     @Override 
    117     public void terminate() { 
    118         super.terminate(); 
     119        @Override 
     120        public void terminate() { 
     121                super.terminate(); 
    119122 
    120         for ( final EventProcessor processor : processors ) { 
    121             processor.halt(); 
    122         } 
    123     } 
     123                for( final EventProcessor processor : processors ) { 
     124                        processor.halt(); 
     125                } 
     126        } 
    124127 
    125     @Override 
    126     public String toString() { 
    127         return String.format( 
    128                 "%s[%s][%s]", 
    129                 super.toString(), 
    130                 waitStrategy.getClass().getSimpleName(), 
    131                 ( publishInBatch ) ? ( "batching" ) : ( "no-batching" ) 
    132         ); 
    133     } 
     128        @Override 
     129        public String toString() { 
     130                return String.format( 
     131                                "%s[%s][%s]", 
     132                                super.toString(), 
     133                                waitStrategy.getClass().getSimpleName(), 
     134                                ( publishInBatch ) ? ( "batching" ) : ( "no-batching" ) 
     135                ); 
     136        } 
    134137 
    135     private static class PassThroughHandler<E> implements EventHandler<E> { 
     138        private static class PassThroughHandler<E> implements EventHandler<E> { 
    136139 
    137         private final IUsefulWorkEmulator<E> emulator; 
     140                private final IUsefulWorkEmulator<E> emulator; 
    138141 
    139         public PassThroughHandler( final IUsefulWorkEmulator<E> emulator ) { 
    140             this.emulator = emulator; 
    141         } 
     142                public PassThroughHandler( final IUsefulWorkEmulator<E> emulator ) { 
     143                        this.emulator = emulator; 
     144                } 
    142145 
    143         @Override 
    144         public void onEvent( final E event, 
    145                              final long sequence, 
    146                              final boolean endOfBatch ) throws Exception { 
    147             emulator.spendCPUTime( event ); 
    148         } 
    149     } 
     146                @Override 
     147                public void onEvent( final E event, 
     148                                     final long sequence, 
     149                                     final boolean endOfBatch ) throws Exception { 
     150                        emulator.spendCPUTime( event ); 
     151                } 
     152        } 
    150153 
    151     private class Enqueuer extends EventEnqueuer<E> { 
    152         private final RingBuffer<E> ringBuffer; 
     154        private class Enqueuer extends EventEnqueuer<E> { 
     155                private final RingBuffer<E> ringBuffer; 
    153156 
    154         private Enqueuer( final int eventsInBatch, 
    155                           final EventFactory<E> factory, 
    156                           final RingBuffer<E> ringBuffer ) { 
    157             super( eventsInBatch, factory ); 
    158             this.ringBuffer = ringBuffer; 
    159         } 
     157                private Enqueuer( final int eventsInBatch, 
     158                                  final EventFactory<E> factory, 
     159                                  final RingBuffer<E> ringBuffer ) { 
     160                        super( eventsInBatch, factory ); 
     161                        this.ringBuffer = ringBuffer; 
     162                } 
    160163 
    161         @Override 
    162         protected void enqueueEventsBatch( final int eventsInBatch, 
    163                                            final EventFactory<E> factory ) throws Exception { 
    164             if ( publishInBatch ) { 
    165                 final BatchDescriptor batch = ringBuffer.newBatchDescriptor( eventsInBatch ); 
    166                 ringBuffer.next( batch ); 
     164                @Override 
     165                protected void enqueueEventsBatch( final int eventsInBatch, 
     166                                                   final EventFactory<E> factory ) throws Exception { 
     167                        if( publishInBatch ) { 
     168                                final BatchDescriptor batch = ringBuffer.newBatchDescriptor( eventsInBatch ); 
     169                                ringBuffer.next( batch ); 
    167170//                for(long sequence=batch.getStart();sequence<batch.getEnd();sequence++){ 
    168171//                    final E msg = ringBuffer.get( sequence ); 
    169172//                } 
    170                 ringBuffer.publish( batch ); 
    171             } else { 
    172                 for ( int i = 0; i < eventsInBatch; i++ ) { 
    173                     final long nextSequence = ringBuffer.next(); 
    174                     ringBuffer.get( nextSequence ); 
    175                     ringBuffer.publish( nextSequence ); 
    176                 } 
    177             } 
    178         } 
    179     } 
     173                                ringBuffer.publish( batch ); 
     174                        } else { 
     175                                for( int i = 0; i < eventsInBatch; i++ ) { 
     176                                        final long nextSequence = ringBuffer.next(); 
     177                                        ringBuffer.get( nextSequence ); 
     178                                        ringBuffer.publish( nextSequence ); 
     179                                } 
     180                        } 
     181                } 
     182        } 
    180183} 
  • Tests/JAVA/test/src/main/java/test/threads/queue/stressed/ITask.java

    r523 r563  
    11package test.threads.queue.stressed; 
    22 
    3 import java.util.Locale; 
     3import java.util.*; 
    44import java.util.concurrent.Executor; 
    55 
     
    88 
    99/** 
    10  * fixme: Class ITask is for porn 
    11  * 
    1210 * @author cheremin 
    1311 * @since 27.02.12,  14:06 
     
    1513public interface ITask<E> { 
    1614 
    17     public int workersRequired(); 
     15        public int workersRequired(); 
    1816 
    19     public void initialize( final Executor threadPool, 
    20                             final IUsefulWorkEmulator<E>[] emulators, 
    21                             final EventFactory<E> eventFactory ); 
     17        public void initialize( final Executor threadPool, 
     18                                final IUsefulWorkEmulator<E>[] emulators, 
     19                                final EventFactory<E> eventFactory ); 
    2220 
    23     public BenchmarkResult[] execute( final long millisToBogus, 
    24                                     final long millisToMeasure, 
    25                                     final int turns ); 
     21        public BenchmarkResult[] execute( final long millisToBogus, 
     22                                          final long millisToMeasure, 
     23                                          final int turns ); 
    2624 
    27     public void terminate(); 
     25        public void terminate(); 
    2826 
    29     @Immutable 
    30     public static class BenchmarkResult { 
    31         public final long timeEllapsedMilliseconds; 
    32         public final long timeEllapsedNanoseconds; 
     27        @Immutable 
     28        public static class BenchmarkResult { 
     29                public final long timeElapsedMilliseconds; 
     30                public final long timeElapsedNanoseconds; 
    3331 
    34         public final long packetsCompleted; 
     32                public final long packetsCompleted; 
    3533 
    36         public BenchmarkResult( final long timeEllapsedMilliseconds, 
    37                                 final long timeEllapsedNanoseconds, 
    38                                 final long packetsCompleted ) { 
    39             this.timeEllapsedMilliseconds = timeEllapsedMilliseconds; 
    40             this.timeEllapsedNanoseconds = timeEllapsedNanoseconds; 
    41             this.packetsCompleted = packetsCompleted; 
    42         } 
     34                public BenchmarkResult( final long timeElapsedMilliseconds, 
     35                                        final long timeElapsedNanoseconds, 
     36                                        final long packetsCompleted ) { 
     37                        this.timeElapsedMilliseconds = timeElapsedMilliseconds; 
     38                        this.timeElapsedNanoseconds = timeElapsedNanoseconds; 
     39                        this.packetsCompleted = packetsCompleted; 
     40                } 
    4341 
    44         public String print( final String taskName, 
    45                              final String payload ) { 
    46             return String.format( 
    47                     Locale.US, 
    48                     "%s@%s: %d packets of %s, takes %d ms (%.1f packets/ms)", 
    49                     taskName, 
    50                     payload, 
    51                     packetsCompleted, payload, 
    52                     timeEllapsedMilliseconds, 
    53                     packetsCompleted * 1.0 / timeEllapsedMilliseconds 
     42                public String print( final String taskName, 
     43                                     final String payload ) { 
     44                        return String.format( 
     45                                        Locale.US, 
     46                                        "%s@%s: %d packets of %s, takes %d ms (%.1f packets/ms)", 
     47                                        taskName, 
     48                                        payload, 
     49                                        packetsCompleted, payload, 
     50                                        timeElapsedMilliseconds, 
     51                                        packetsCompleted * 1.0 / timeElapsedMilliseconds 
    5452 
    55             ); 
    56         } 
     53                        ); 
     54                } 
    5755 
    58         public String printTabular( final String taskName, 
    59                                     final String payloadDescription ) { 
    60             return String.format( 
    61                     Locale.US, 
    62                     "%s@%s\t%d\t%d\t%.1f", 
    63                     taskName, 
    64                     payloadDescription, 
    65                     packetsCompleted, 
    66                     timeEllapsedMilliseconds, 
    67                     packetsCompleted * 1.0 / timeEllapsedMilliseconds 
     56                public String printTabular( final String taskName, 
     57                                            final String payloadDescription ) { 
     58                        return String.format( 
     59                                        Locale.US, 
     60                                        "%s@%s\t%d\t%d\t%.1f", 
     61                                        taskName, 
     62                                        payloadDescription, 
     63                                        packetsCompleted, 
     64                                        timeElapsedMilliseconds, 
     65                                        packetsCompleted * 1.0 / timeElapsedMilliseconds 
    6866 
    69             ); 
    70         } 
    71     } 
     67                        ); 
     68                } 
     69        } 
    7270} 
  • Tests/JAVA/test/src/main/java/test/threads/queue/stressed/SingleThreadedTask.java

    r523 r563  
    77import org.apache.commons.logging.LogFactory; 
    88import test.helpers.Config; 
     9import test.threads.queue.common.BaseEnqueuer; 
    910 
    1011/** 
     
    1516 */ 
    1617public class SingleThreadedTask<E> extends AbstractTask<E> { 
    17     private static final Log log = LogFactory.getLog( SingleThreadedTask.class ); 
     18        private static final Log log = LogFactory.getLog( SingleThreadedTask.class ); 
    1819 
    19     public SingleThreadedTask() { 
    20     } 
     20        public SingleThreadedTask() { 
     21        } 
    2122 
    22     public SingleThreadedTask( final int bufferSize, 
    23                                final int packetsInBatch ) { 
    24         super( bufferSize, packetsInBatch ); 
    25     } 
     23        public SingleThreadedTask( final int bufferSize, 
     24                                   final int packetsInBatch ) { 
     25                super( bufferSize, packetsInBatch ); 
     26        } 
    2627 
    27     public SingleThreadedTask( final Config config ) { 
    28         super( config ); 
    29     } 
     28        public SingleThreadedTask( final Config config ) { 
     29                super( config ); 
     30        } 
    3031 
    31     @Override 
    32     public int workersRequired() { 
    33         // this is fake: we do not actually use 1 additional thread, instead we do all work in 
    34         // calling thread. So we really use 1 worker, but it is calling thread worker, 
    35         // not additional thread worker, as usually 
    36         return 1; 
    37     } 
     32        @Override 
     33        public int workersRequired() { 
     34                // this is fake: we do not actually use 1 additional thread, instead we do all work in 
     35                // calling thread. So we really use 1 worker, but it is calling thread worker, 
     36                // not additional thread worker, as usually 
     37                return 1; 
     38        } 
    3839 
    39     private IUsefulWorkEmulator<E> emulator; 
     40        private IUsefulWorkEmulator<E> emulator; 
    4041 
    41     @Override 
    42     protected EventEnqueuer _initialize( final Executor threadPool, 
    43                                          final IUsefulWorkEmulator<E>[] emulators, 
    44                                          final EventFactory<E> factory ) throws Exception { 
    45         this.emulator = emulators[0]; 
    46         return new EventEnqueuer<E>( eventsInBatch(), factory ) { 
    47             @Override 
    48             protected void enqueueEventsBatch( final int eventsInBatch, 
    49                                                final EventFactory<E> factory ) throws Exception { 
    50                 //do nothing 
    51             } 
    52         }; 
    53     } 
     42        @Override 
     43        protected BaseEnqueuer[] initializeEnqueuers( final Executor threadPool, 
     44                                                      final IUsefulWorkEmulator<E>[] emulators, 
     45                                                      final EventFactory<E> factory ) throws Exception { 
     46                this.emulator = emulators[0]; 
     47                return new BaseEnqueuer[] { new EventEnqueuer<E>( eventsInBatch(), factory ) { 
     48                        @Override 
     49                        protected void enqueueEventsBatch( final int eventsInBatch, 
     50                                                           final EventFactory<E> factory ) throws Exception { 
     51                                //do nothing 
     52                        } 
     53                } }; 
     54        } 
    5455 
    55     @Override 
    56     protected BenchmarkResult[] executeImpl( final long millisToBogus, 
    57                                              final long millisToMeasure, 
    58                                              final int measurements ) throws Exception { 
    59         long eventsDequeued = 0; 
     56        @Override 
     57        protected BenchmarkResult[] executeImpl( final long millisToBogus, 
     58                                                 final long millisToMeasure, 
     59                                                 final int measurements ) throws Exception { 
     60                long eventsDequeued = 0; 
    6061 
    6162 
    6263//        final long msToOneMeasure = millisToMeasure / measurements; 
    63         long eventsAtStart = eventsDequeued; 
    64         long startedAtMs = System.currentTimeMillis(); 
    65         long startedAtNs = System.nanoTime(); 
    66         while ( true ) { 
    67             dequeueBatchEvents( eventsInBatch() ); 
    68             eventsDequeued += eventsInBatch(); 
     64                long eventsAtStart = eventsDequeued; 
     65                long startedAtMs = System.currentTimeMillis(); 
     66                long startedAtNs = System.nanoTime(); 
     67                while( true ) { 
     68                        dequeueBatchEvents( eventsInBatch() ); 
     69                        eventsDequeued += eventsInBatch(); 
    6970 
    70             final long finishedAtMs = System.currentTimeMillis(); 
    71             final long ellapsedMs = finishedAtMs - startedAtMs; 
    72             if ( ellapsedMs >= millisToMeasure ) { 
    73                 final long eventsProcessed = eventsDequeued - eventsAtStart; 
    74                 final long finishedAtNs = System.nanoTime(); 
    75                 final long ellapsedNs = finishedAtNs - startedAtNs; 
     71                        final long finishedAtMs = System.currentTimeMillis(); 
     72                        final long ellapsedMs = finishedAtMs - startedAtMs; 
     73                        if( ellapsedMs >= millisToMeasure ) { 
     74                                final long eventsProcessed = eventsDequeued - eventsAtStart; 
     75                                final long finishedAtNs = System.nanoTime(); 
     76                                final long ellapsedNs = finishedAtNs - startedAtNs; 
    7677 
    77                 final BenchmarkResult result = new BenchmarkResult( 
    78                         ellapsedMs, 
    79                         ellapsedNs, 
    80                         eventsProcessed 
    81                 ); 
    82                 return new BenchmarkResult[]{ result }; 
    83             } 
    84         } 
    85     } 
     78                                final BenchmarkResult result = new BenchmarkResult( 
     79                                                ellapsedMs, 
     80                                                ellapsedNs, 
     81                                                eventsProcessed 
     82                                ); 
     83                                return new BenchmarkResult[] { result }; 
     84                        } 
     85                } 
     86        } 
    8687 
    87     @Override 
    88     protected long dequeueBatchEvents( final int eventsInBatch ) throws Exception { 
    89         final EventFactory<E> factory = eventFactory(); 
    90         for ( int i = 0; i < eventsInBatch; i++ ) { 
    91             final E event = factory.newInstance(); 
    92             emulator.spendCPUTime( event ); 
    93         } 
    94         return 0; 
    95     } 
     88        @Override 
     89        protected long dequeueBatchEvents( final int eventsInBatch ) throws Exception { 
     90                final EventFactory<E> factory = eventFactory(); 
     91                for( int i = 0; i < eventsInBatch; i++ ) { 
     92                        final E event = factory.newInstance(); 
     93                        emulator.spendCPUTime( event ); 
     94                } 
     95                return 0; 
     96        } 
    9697} 
Note: See TracChangeset for help on using the changeset viewer.