07 April 2015

I set out a few days back to write a simple Groovy script that would accomplish the following:

  • Send data to a Kafka topic from X concurrent producers.

  • Run for a set duration

  • Achieve a set number of sends per minute

To do so, I decided to use GPars Framework, which is a "concurrency and parallelism library for Java and Groovy that gives you a number of high-level abstractions for writing concurrent and parallel code in Groovy." Additionally, I decided to make use of the Google Guava RateLimiter class to control how much data is produced. The resulting script is pretty simple:

@Grapes([
    @Grab(group='org.codehaus.gpars', module='gpars', version='1.2.1'),
    @Grab(group='com.google.guava', module='guava', version='18.0'),
    @GrabExclude('commons-logging:commons-logging')])

def cli = new CliBuilder(usage:'groovy loadTest.groovy ')
cli.p(longOpt:'parallelism', args: 1, argName: 'parallelism', 'The number of producers to execute in parallel.  Defaults to 10.')
cli.d(longOpt:'duration', args: 1, argName: 'duration', 'The duration of the test in minutes.  Defaults to 1 minute.')
cli.r(longOpt:'target-rate', args: 1, argName: 'targetRate', 'The target aggregate rate of messages to be produced per minute.  Defaults to 60.')
cli.h(longOpt:'help', 'Displays the usage information for this script.')

def run = { ->
    // Do the actual work here (e.g. publish a message, etc)
}

def setup = { ->
    // Do any pre-test setup here (e.g. retrieving data, configuration, etc)
}

def options = cli.parse(args)

if(options?.h) {
    cli.usage()
} else {
   def duration = options?.d ? TimeUnit.MINUTES.toMillis(options.d as long) :  TimeUnit.MINUTES.toMillis(1l)
   def parallelism = options?.p ?: 10
   def targetRate = options?.r ?: 60.0

   println '***********************************************'
   println 'Load Test'
   println "Test will run for ${TimeUnit.MILLISECONDS.toMinutes(duration)} minute(s) using ${parallelism} producers with a target rate of ${targetRate} messages per minute."

   setup()

   println 'Starting the test....'

   def rateLimiter = RateLimiter.create((targetRate as double)/TimeUnit.MINUTES.toSeconds(1))
   def startTime = System.currentTimeMillis()
   def total = 0
   while((System.currentTimeMillis() - startTime) < duration) {
       GParsExecutorsPool.withPool(parallelism as int) { ExecutorService service ->
           (parallelism as int).times {
               rateLimiter.acquire()
               service.submit({ run() } as Runnable)
           }
           total += parallelism as int
       }
       println "Test iteration complete.  Total sent so far: ${total}, ${TimeUnit.MILLISECONDS.toSeconds(duration - (System.currentTimeMillis() - startTime))} second(s) remaining."
   }
   println 'Test complete.'
   System.exit(0)
}

The key points in the script above is the use of the GParsExecutorsPool to handle the creation of a configured thread pool and the inclusion of the RateLimiter to slow down the execution loop to meet the target rate. Finally, all of this is done in a while loop that keeps looping until the requested duration has been achieved. The end result is a simple script that allows you to generate concurrent request load at a target rate for a set duration.

comments powered by Disqus