Changeset 585 for Tests


Ignore:
Timestamp:
12/04/13 05:16:56 (8 years ago)
Author:
BegemoT
Message:
 
Location:
Tests/JAVA/logger
Files:
5 edited
1 copied

Legend:

Unmodified
Added
Removed
  • Tests/JAVA/logger/logger.iml

    r582 r585  
    77      <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" /> 
    88      <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" /> 
    9       <excludeFolder url="file://$MODULE_DIR$/target" /> 
     9      <sourceFolder url="file://$MODULE_DIR$/target/generated-test-sources/test-annotations" isTestSource="true" /> 
     10      <sourceFolder url="file://$MODULE_DIR$/target/generated-sources/annotations" isTestSource="false" /> 
     11      <excludeFolder url="file://$MODULE_DIR$/target/classes" /> 
     12      <excludeFolder url="file://$MODULE_DIR$/target/maven-archiver" /> 
     13      <excludeFolder url="file://$MODULE_DIR$/target/maven-status" /> 
     14      <excludeFolder url="file://$MODULE_DIR$/target/test-classes" /> 
    1015    </content> 
    1116    <orderEntry type="inheritedJdk" /> 
     
    1419    <orderEntry type="library" name="Maven: args4j:args4j:2.0.16" level="project" /> 
    1520    <orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" level="project" /> 
    16     <orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" /> 
     21    <orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" /> 
    1722    <orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-all:1.3" level="project" /> 
    18     <orderEntry type="library" name="Maven: org.mockito:mockito-core:1.9.5" level="project" /> 
    19     <orderEntry type="library" name="Maven: org.objenesis:objenesis:1.0" level="project" /> 
     23    <orderEntry type="library" scope="TEST" name="Maven: org.mockito:mockito-core:1.9.5" level="project" /> 
     24    <orderEntry type="library" scope="TEST" name="Maven: org.objenesis:objenesis:1.0" level="project" /> 
    2025    <orderEntry type="library" name="Maven: net.sf.trove4j:trove4j:3.0.3" level="project" /> 
    2126    <orderEntry type="library" name="Maven: commons-logging:commons-logging:1.1.1" level="project" /> 
    22     <orderEntry type="library" scope="RUNTIME" name="Maven: log4j:log4j:1.2.16" level="project" /> 
     27    <orderEntry type="library" name="Maven: log4j:log4j:1.2.16" level="project" /> 
    2328    <orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-api:2.0-beta9" level="project" /> 
    2429    <orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-core:2.0-beta9" level="project" /> 
  • Tests/JAVA/logger/pom.xml

    r582 r585  
    8383            <artifactId>mockito-core</artifactId> 
    8484            <version>1.9.5</version> 
     85            <scope>test</scope> 
    8586        </dependency> 
    8687 
     
    103104            <artifactId>log4j</artifactId> 
    104105            <version>1.2.16</version> 
    105             <scope>runtime</scope> 
     106            <!--<scope>runtime</scope>--> 
    106107        </dependency> 
    107108 
     
    119120 
    120121 
    121 <!-- 
     122        <!-- 
    122123        <dependency> 
    123124            <groupId>commons-cli</groupId> 
     
    289290                <groupId>org.apache.maven.plugins</groupId> 
    290291                <artifactId>maven-shade-plugin</artifactId> 
     292                <version>2.2</version> 
    291293                <executions> 
    292294                    <execution> 
     
    303305                                </transformer> 
    304306                            </transformers> 
     307                            <artifactSet> 
     308                            <excludes> 
     309                              <exclude>*:*:sources</exclude> 
     310                              <exclude>*:*:tests</exclude> 
     311                            </excludes> 
     312                          </artifactSet> 
    305313                        </configuration> 
    306314                    </execution> 
  • Tests/JAVA/logger/src/main/java/com/db/logger/api/impl/logger/MessageFormatterImpl.java

    r581 r585  
    44import com.db.logger.api.impl.logger.buffer.ILongsBuffer; 
    55 
     6import static com.db.logger.api.impl.logger.DemultiplexingSequencer.INVALID_INDEX; 
    67import static com.db.logger.api.impl.logger.RecordHelper.RecordType.LOG_RECORD; 
    78import static com.google.common.base.Preconditions.checkState; 
     
    5758        @Override 
    5859        public MessageFormatterImpl with( final long value ) { 
    59                 if( position == DemultiplexingSequencer.INVALID_INDEX ) { 
     60                if( position == INVALID_INDEX ) { 
    6061                        return this; 
    6162                } 
    6263                checkState( argumentIndex < argumentsCount, 
    63                             "Only %s arguments allowed", argumentsCount ); 
     64                            "Only %s arguments allowed but %s is", argumentsCount, argumentIndex ); 
    6465 
    65                 argumentIndex++; 
     66                argumentIndex++;//0 reserved for header! 
    6667                buffer.put( 
    6768                                position + argumentIndex, 
     
    7980        @Override 
    8081        public void submit() { 
    81                 if( position == DemultiplexingSequencer.INVALID_INDEX ) { 
     82                if( position == INVALID_INDEX ) { 
    8283                        return; 
    8384                } 
    8485                checkState( argumentIndex == argumentsCount, 
    85                             "early submit: %s %s", argumentIndex, argumentsCount ); 
     86                            "early submit: %s < %s", argumentIndex, argumentsCount ); 
     87 
    8688                final long header = RecordHelper.header( 
    8789                                LOG_RECORD, 
     
    9294 
    9395                argumentIndex = 0; 
    94                 position = DemultiplexingSequencer.INVALID_INDEX; 
     96                position = INVALID_INDEX; 
    9597        } 
    9698} 
  • Tests/JAVA/logger/src/main/java/com/db/logger/api/impl/logger/UnsafeFastLogger.java

    r581 r585  
    22 
    33import java.io.Closeable; 
     4import java.io.IOException; 
     5import java.nio.ByteBuffer; 
    46import java.util.*; 
    57import java.util.concurrent.ConcurrentHashMap; 
     
    911import com.db.logger.api.MessageFormatter; 
    1012import com.db.logger.api.impl.logger.buffer.ILongsBuffer; 
     13import com.db.logger.io.storage.RawWriter; 
    1114import com.db.logger.timesource.ITimestampSource; 
     15import com.google.common.base.Throwables; 
    1216import org.apache.commons.logging.Log; 
    1317import org.apache.commons.logging.LogFactory; 
    14  
     18import org.openjdk.jmh.logic.BlackHole; 
     19 
     20import static com.db.logger.api.impl.logger.RecordHelper.*; 
     21import static com.db.logger.api.impl.logger.RecordHelper.isValidHeader; 
    1522import static com.google.common.base.Preconditions.checkArgument; 
    1623 
     
    2734        private final DemultiplexingSequencer sequencer; 
    2835 
    29         private final ITimestampSource timestampSource; 
    30  
    31         private final Reporter reporter; 
     36        private final RawWriter writer; 
     37 
    3238 
    3339        public UnsafeFastLogger( final ThreadFactory threadFactory, 
    34                                  final ITimestampSource timestampSource, 
    3540                                 final ILongsBuffer buffer, 
    36                                  final Reporter reporter ) { 
     41                                 final RawWriter writer ) { 
    3742                checkArgument( threadFactory != null, "threadFactory can't be null" ); 
    38                 checkArgument( timestampSource != null, "timestampSource can't be null" ); 
    3943                checkArgument( buffer != null, "buffer can't be null" ); 
    40                 checkArgument( reporter != null, "reporter can't be null" ); 
     44                checkArgument( writer != null, "writer can't be null" ); 
     45 
    4146 
    4247                this.threadFactory = threadFactory; 
    43                 this.timestampSource = timestampSource; 
    4448                this.buffer = buffer; 
    45                 this.reporter = reporter; 
     49                this.writer = writer; 
    4650 
    4751 
    4852                final int length = buffer.length(); 
    4953                this.sequencer = new DemultiplexingSequencer( length ); 
    50  
    51  
    5254        } 
    5355 
     
    6264 
    6365        @Override 
    64         public synchronized MessageFormatter formatter( final String formatMessage ) { 
     66        public synchronized MessageFormatterImpl formatter( final String formatMessage ) { 
     67                //TODO RC: need to always return new Formated, but with same formatId sor 
     68                // same format! 
    6569                MessageFormatterImpl formatter = formatters.get( formatMessage ); 
    6670                if( formatter == null ) { 
     
    9397        public synchronized void startDraining() { 
    9498                if( drainerThread == null ) { 
     99                        final Drainer drainer = new Drainer( 
     100                                        new ConsumingDrainer( 
     101                                                        buffer, 
     102                                                        writer 
     103                                        ) 
     104                        ); 
    95105                        drainerThread = threadFactory.newThread( 
    96                                         new Drainer() 
     106                                        drainer 
    97107                        ); 
    98108                        drainerThread.start(); 
     
    102112        } 
    103113 
    104         public synchronized void stopDraining() { 
     114        public synchronized void stopDraining() throws InterruptedException { 
    105115                if( drainerThread != null ) { 
    106116                        drainerThread.interrupt(); 
     117                        drainerThread.join(); 
    107118                        drainerThread = null; 
    108119                } 
     
    124135 
    125136                private long reportingPeriodMs = MIN_PERIOD_MS; 
     137 
     138                private final ConsumingDrainer consumer; 
     139 
     140                private Drainer( final ConsumingDrainer consumer ) { 
     141                        this.consumer = consumer; 
     142                } 
    126143 
    127144                @Override 
     
    137154                                        ); 
    138155                                        Thread.sleep( reportingPeriodMs ); 
     156                                } catch( InterruptedException e ) { 
     157                                        log.info( "Reporting engine " + consumer + " interrupted" ); 
    139158                                } catch( Throwable t ) { 
    140                                         log.error( "Reporting engine " + reporter + " error", t ); 
     159                                        log.error( "Reporting engine " + consumer + " error", t ); 
    141160                                } 
    142161                        } 
     
    148167                public int available( final long startSequence, 
    149168                                      final long sentinelSequence ) { 
    150                         final int filledCount = filledEntriesCount( startSequence, sentinelSequence ); 
    151                         if( filledCount > 0 ) { 
    152                                 reporter.process( 
    153                                                 buffer, 
    154                                                 startSequence, 
    155                                                 startSequence + filledCount 
    156                                 ); 
    157                         } 
    158                         //TODO extract marking record 'done' here, instead of Reporter? 
    159                         processedRecords = filledCount; 
     169                        processedRecords = consumer.available( 
     170                                        startSequence, 
     171                                        sentinelSequence 
     172                        ); 
    160173                        return processedRecords; 
    161174                } 
     
    189202                } 
    190203        } 
     204 
     205        private static class ConsumingDrainer implements DemultiplexingSequencer.Drainer { 
     206                private static final int SPINS_PER_TURN = 256; 
     207 
     208                private final ILongsBuffer buffer; 
     209                private final RawWriter writer; 
     210 
     211                private ConsumingDrainer( final ILongsBuffer buffer, 
     212                                          final RawWriter writer ) { 
     213                        this.buffer = buffer; 
     214                        this.writer = writer; 
     215                } 
     216 
     217                private int spinsAvailable; 
     218 
     219                @Override 
     220                public int available( final long startSequence, 
     221                                      final long sentinelSequence ) { 
     222                        spinsAvailable = SPINS_PER_TURN; 
     223                        ByteBuffer output = writer.buffer(); 
     224                        try { 
     225                                for( long pos = startSequence; pos < sentinelSequence; pos++ ) { 
     226                                        final long header = readHeader( pos ); 
     227                                        if( !isValidHeader( header ) ) { 
     228                                                return ( int ) ( pos - startSequence ); 
     229                                        } 
     230                                        final RecordType type = type( header ); 
     231                                        final int formatId = formatId( header ); 
     232                                        final int argumentsCount = argumentsCount( header ); 
     233                                        if( output.remaining() < ( argumentsCount + 1 ) * 8 ) { 
     234                                                writer.flush(); 
     235                                                output = writer.buffer(); 
     236                                        } 
     237                                        output.putLong( header ); 
     238                                        for( int i = 1; i <= argumentsCount; i++ ) { 
     239                                                final long arg = buffer.get( pos + i ); 
     240                                                buffer.put( pos + i, -1 );//need to reclaim all! 
     241                                                output.putLong( arg ); 
     242                                        } 
     243                                        //reclaim slot 
     244 
     245                                        buffer.put( pos, -1 ); 
     246                                        pos += argumentsCount; 
     247                                } 
     248                                return ( int ) ( sentinelSequence - startSequence ); 
     249                        } catch( IOException e ) { 
     250                                throw Throwables.propagate( e ); 
     251                        } 
     252                } 
     253 
     254                private long readHeader( final long pos ) { 
     255                        for(; spinsAvailable >= 0; spinsAvailable-- ) { 
     256                                final long header = buffer.getVolatile( pos ); 
     257                                if( isValidHeader( header ) ) { 
     258                                        return header; 
     259                                } 
     260                        } 
     261                        return -1; 
     262                } 
     263        } 
    191264} 
  • Tests/JAVA/logger/src/main/java/com/db/logger/benchmarks/FormatterWriteAndDrainBenchmark.java

    r581 r585  
    170170                                for( int i = 1; i <= argumentsCount; i++ ) { 
    171171                                        hole.consume( buffer.get( pos + i ) ); 
     172                                        buffer.put( pos + i, -1 );//needs to reclaim all! 
    172173                                } 
    173174                                buffer.put( pos, -1 ); 
  • Tests/JAVA/logger/src/main/java/com/db/logger/benchmarks/UnsafeFastLoggerBenchmark.java

    r581 r585  
    11package com.db.logger.benchmarks; 
    22 
     3import java.io.IOException; 
     4import java.nio.ByteBuffer; 
     5import java.util.concurrent.ThreadFactory; 
    36import java.util.concurrent.TimeUnit; 
    47import java.util.concurrent.atomic.AtomicInteger; 
    58 
     9import com.db.logger.api.MessageFormatter; 
     10import com.db.logger.api.impl.FluentFormatter; 
    611import com.db.logger.api.impl.logger.*; 
    712import com.db.logger.api.impl.logger.buffer.DirectAccessLongBuffer; 
    813import com.db.logger.api.impl.logger.buffer.ILongsBuffer; 
     14import com.db.logger.io.storage.RawWriter; 
     15import org.apache.log4j.BasicConfigurator; 
    916import org.openjdk.jmh.annotations.*; 
    1017import org.openjdk.jmh.logic.BlackHole; 
    11  
    12 import static com.db.logger.api.impl.logger.RecordHelper.*; 
    13 import static com.db.logger.api.impl.logger.WaitingStrategy.SPINNING; 
    1418 
    1519/** 
     
    2024@OutputTimeUnit( TimeUnit.NANOSECONDS ) 
    2125@State( Scope.Group ) 
    22 public class FormatterWriteAndDrainBenchmark { 
     26public class UnsafeFastLoggerBenchmark { 
    2327        public static final int LENGTH = Integer.getInteger( "length", 1 << 14 ); 
    2428        public static final int CELLS_PER_RECORD = Integer.getInteger( "cells-per-record", 8 );//8longs = 1 cache line 
    2529        public static final int WRITER_BACKOFF = Integer.getInteger( "writer-backoff", 20 ); 
    2630 
    27         public static final WaitingStrategy WAITING_STRATEGY = new WaitingStrategy.LimitedSpinning( 1024 ); 
     31        public static final WaitingStrategy WAITING_STRATEGY = new WaitingStrategy.LimitedSpinning( 1024 * 8 ); 
    2832 
    2933        public static final AtomicInteger ID_GENERATOR = new AtomicInteger( 1 ); 
    3034 
    31         public static final DemultiplexingSequencer.Drainer DRAIN_DUMMY = new DemultiplexingSequencer.Drainer() { 
     35        public static final RawWriter DUMMY_WRITER = new RawWriter() { 
     36                private final ByteBuffer buffer = ByteBuffer.allocateDirect( 1 << 14 ); 
     37 
    3238                @Override 
    33                 public int available( final long startSequence, 
    34                                       final long sentinelSequence ) { 
    35                         return ( int ) ( sentinelSequence - startSequence ); 
     39                public ByteBuffer buffer() { 
     40                        return buffer; 
     41                } 
     42 
     43                @Override 
     44                public void flush() throws IOException { 
     45                        buffer.clear(); 
     46                } 
     47 
     48                @Override 
     49                public void close() throws IOException { 
     50                        buffer.clear(); 
    3651                } 
    3752        }; 
    3853 
    3954        static { 
     55                BasicConfigurator.configure(); 
    4056                System.out.printf( "len=%d, record=%d, backoff=%d\n", LENGTH, CELLS_PER_RECORD, WRITER_BACKOFF ); 
    4157        } 
     
    4460        public DemultiplexingSequencer sequencer; 
    4561 
    46         public DemultiplexingSequencer.Drainer DRAIN_AND_READ; 
     62        public UnsafeFastLogger logger; 
    4763 
    4864        @Setup 
     
    5066                buffer = new DirectAccessLongBuffer( LENGTH ); 
    5167                sequencer = new DemultiplexingSequencer( LENGTH ); 
    52  
    53                 DRAIN_AND_READ = new ConsumingDrainer(); 
     68                logger = new UnsafeFastLogger( 
     69                                new ThreadFactory() { 
     70                                        @Override 
     71                                        public Thread newThread( final Runnable r ) { 
     72                                                return new Thread( r ); 
     73                                        } 
     74                                }, 
     75                                buffer, 
     76                                DUMMY_WRITER 
     77                ); 
     78                logger.startDraining(); 
    5479        } 
    5580 
    5681        @TearDown 
    57         public void tearDown() { 
    58  
     82        public void tearDown() throws Exception { 
     83                logger.stopDraining(); 
    5984        } 
    6085 
     
    79104                        ts.setup( this ); 
    80105                } 
    81                 MessageFormatterImpl formatter = ts.formatter; 
    82                 formatter = formatter.start( WAITING_STRATEGY ); 
    83                 final int count = formatter.argumentsCount(); 
     106                FluentFormatter formatter = ts.formatter.start( WAITING_STRATEGY ); 
     107                final int count = CELLS_PER_RECORD; 
    84108                for( int i = 0; i < count; i++ ) { 
    85109                        formatter = formatter.with( ( long ) i ); 
     
    87111                formatter.submit(); 
    88112 
     113                ts.ops++; 
     114 
    89115                BlackHole.consumeCPU( WRITER_BACKOFF ); 
    90116        } 
    91117 
    92         @GenerateMicroBenchmark 
    93         @Group( "formatAndRead" ) 
    94         @Threads( 1 ) 
    95         public void readingDrainer() { 
    96                 try { 
    97                         sequencer.drainTo( DRAIN_AND_READ ); 
    98 //                      BlackHole.consumeCPU( 100 ); 
    99                 } catch( Throwable e ) { 
    100                         e.printStackTrace(); 
    101                 } 
    102         } 
     118//      @GenerateMicroBenchmark 
     119//      @Group( "formatAndRead" ) 
     120//      @Threads( 1 ) 
     121//      public void readingDrainer() { 
     122//              try { 
     123//                      sequencer.drainTo( DRAIN_AND_READ ); 
     124////                    BlackHole.consumeCPU( 100 ); 
     125//              } catch( Throwable e ) { 
     126//                      e.printStackTrace(); 
     127//              } 
     128//      } 
    103129 
    104130        @State( Scope.Thread ) 
     
    107133 
    108134                public MessageFormatterImpl formatter; 
     135                public long ops = 0; 
    109136 
    110                 public void setup( final FormatterWriteAndDrainBenchmark b ) { 
    111                         formatter = new MessageFormatterImpl( 
    112                                         "", 
    113                                         id, 
    114                                         CELLS_PER_RECORD - 1, 
    115                                         b.buffer, 
    116                                         b.sequencer 
    117                         ); 
     137                public void setup( final UnsafeFastLoggerBenchmark b ) { 
     138                        formatter = b.logger.formatter( id + " %d %f %d %f %d %f %f %f" ); 
    118139                } 
    119140        } 
    120141 
    121142        public static void main( final String[] args ) throws Exception { 
    122                 final FormatterWriteAndDrainBenchmark benchmark = new FormatterWriteAndDrainBenchmark(); 
     143                final UnsafeFastLoggerBenchmark benchmark = new UnsafeFastLoggerBenchmark(); 
    123144 
    124145                benchmark.setup(); 
     
    138159                } 
    139160 
    140                 new Thread() { 
    141                         @Override 
    142                         public void run() { 
    143                                 for(; ; ) { 
    144                                         benchmark.readingDrainer(); 
    145                                 } 
    146                         } 
    147                 }.start(); 
    148  
    149161                Thread.sleep( 300000 ); 
    150162        } 
    151163 
    152         private class ConsumingDrainer implements DemultiplexingSequencer.Drainer { 
    153                 private static final int SPINS_PER_TURN = 256; 
    154  
    155                 private int spinsAvailable; 
    156                 private final BlackHole hole = new BlackHole(); 
    157  
    158                 @Override 
    159                 public int available( final long startSequence, 
    160                                       final long sentinelSequence ) { 
    161                         spinsAvailable = SPINS_PER_TURN; 
    162                         for( long pos = startSequence; pos < sentinelSequence; pos++ ) { 
    163                                 final long header = readHeader( pos ); 
    164                                 if( !isValidHeader( header ) ) { 
    165                                         return ( int ) ( pos - startSequence ); 
    166                                 } 
    167                                 final RecordHelper.RecordType type = type( header ); 
    168                                 final int formatId = formatId( header ); 
    169                                 final int argumentsCount = argumentsCount( header ); 
    170                                 for( int i = 1; i <= argumentsCount; i++ ) { 
    171                                         hole.consume( buffer.get( pos + i ) ); 
    172                                 } 
    173                                 buffer.put( pos, -1 ); 
    174                                 pos += argumentsCount; 
    175                         } 
    176                         return ( int ) ( sentinelSequence - startSequence ); 
    177                 } 
    178  
    179                 private long readHeader( final long pos ) { 
    180                         for(; spinsAvailable >= 0; spinsAvailable-- ) { 
    181                                 final long header = buffer.getVolatile( pos ); 
    182                                 if( isValidHeader( header ) ) { 
    183                                         return header; 
    184                                 } 
    185                         } 
    186                         return -1; 
    187                 } 
    188         } 
    189164} 
Note: See TracChangeset for help on using the changeset viewer.