As the ponytail (and the tiny bio below) imply, my
posts will focus on technical details such as language semantics,
general programming issues, compiler design, or anything else I feel is
nifty.

Anyone who follows programming languages will probably have noticed Scala cropping up a lot lately
Scala is an interesting language on several fronts, but the one that
sparked my interest was its strong interoperability with Java.  Often
times academics forget that to be useful a language needs to integrate with existing code and libraries.  But Scala has clearly been designed with heavy focus on industrial usability as well as academic power.  When I read about Scala on Android,
I guessed it wouldn't be hard to integrate with StreamBase.  It turns
out I was mostly right (and we will definitely be fixing the small pain
points I hit).  What follows is an guide to embedding Scala into a StreamBase application.

First we have to set up StreamBase Studio to support Scala development.

  1. Install StreamBase 6.4
  2. Add the Scala 2.7.5 Eclipse Plugin to StreamBase Studio (Help -> Software Updates -> "Add Site…")
  3. Manually download the AJDT Weaving Plugin 1.6.4
  4. Unzip the weaving plugin into your studio install
    • make sure the contents of the zip's "plugins" directory goes into
      Studio's "plugins" directory (I accidentally got that wrong the first
      time)
    • Update 8/31/2009:  On Windows unzipping directly into a Studio install fails with an obscure error message.  Instead, you should simply unzip into an empty directory and copy the contents of the created "plugins" and "features" directories into Studio's respective subdirectories of "C:Program FilesStreamBase SystemsStreamBase.6.4libstudio". (You may need to correct that path if your studio install is in a non-standard location.)
  5. Restart Studio
  6. Enable "JDT Weaving" (Preferences -> JDT Weaving -> "Click to ENABLE")
  7. Restart Studio
  8. Verify that "JDT Weaving" is enabled (Preferences -> JDT Weaving)

Right there you see 95% of the trouble I had.  Definitely easy to do,
once you know what to do, but figuring out that JDT Weaving thing
(especially the specific version needed) took some searching.

Now we can get started using Scala in StreamBase.  The impatient can

  1. Download the sample
  2. Import it (File -> Import … -> Existing Projects into Workspace -> "Select archive file").

or the dedicated can create a new project from scratch

  1. Create a new StreamBase project (Package Explorer -> Right Click -> New -> "StreamBase Project")
  2. Add the Scala Nature to it (Right Click on the Project -> Scala -> "Add Scala Nature")

Yeah that is really it.  Turns out that Eclipse does some clever and
powerful things by having projects include natures like this.  We now
have a project that includes the Java, Scala, and StreamBase natures
and they all play nicely together.

Let's create a basic Java Operator so we have something with which
to compare (File -> New -> "StreamBase Java Operator").  This
auto-generated operator is good enough for your purposes.  I ended up
deleting all the comments and variables and what not, so I could strip
it down as small as possible for the web.

package com.streambase.sample;

import com.streambase.sb.StreamBaseException;import com.streambase.sb.Tuple;import com.streambase.sb.operator.Operator;import com.streambase.sb.operator.Parameterizable;import com.streambase.sb.operator.TypecheckException;

public class SampleJavaOperator extends Operator implements Parameterizable {    public SampleJavaOperator() {        setPortHints(1, 1);        setDisplayName(this.getClass().getSimpleName());        setShortDisplayName(this.getClass().getSimpleName());    }

    public void typecheck() throws TypecheckException {        requireInputPortCount(1);        setOutputSchema(0, getInputSchema(0));    }

    public void processTuple(int inputPort, Tuple tuple) throws StreamBaseException {        sendOutput(0, tuple);    }}

Let's try a naive translation into Scala.

package com.streambase.sample;

import com.streambase.sb.StreamBaseException;import com.streambase.sb.Tuple;import com.streambase.sb.operator.Operator;import com.streambase.sb.operator.Parameterizable;import com.streambase.sb.operator.TypecheckException;

class SampleScalaOperator extends Operator with Parameterizable {    def SampleScalaOperator = {         setPortHints(1, 1);        setDisplayName(this.getClass().getSimpleName());        setShortDisplayName(this.getClass().getSimpleName());    }

    override def typecheck = {        requireInputPortCount(1);        setOutputSchema(0, getInputSchema(0));    }

    override def processTuple(port:Int, tuple:Tuple) = {        sendOutput(0, tuple);    }}

Wow
that wasn't too hard, of course it didn't save us any code either.  But
it does give us enough to test and figure out if it works at all.  I
guess we need to make a simple sbapp to exercise these Operators we
have just written.

SimpleScalaApp
Yeah, that was easy.  Alright, you caught me, I cheated.  Studio does
not (yet) automatically find operators written in Scala, so I had to
hand edit the sbapp file.  But everything works after that, this app
can be run and everything works as you would expect.

Now for a slightly less trivial example.  I hear that Scala has a really nice concurrency primitive called an actor
Well, StreamBase has a multi-threaded architecture so let's see if the
two can be made to play nicely together.  Once again here is a full project that you can import as above.

class SampleScalaOperator extends Operator with Parameterizable {    private object coordinator extends Actor {        def act = {            loop {                 receive {                     case t: Tuple => sendOutput(0, t)                    case None => exit()                }            }        }    }

    def SampleScalaOperator = {         setPortHints(1, 1)        val name = getClass.getSimpleName        setDisplayName(name)        setShortDisplayName(name)    }

    override def typecheck = {        requireInputPortCount(1)        setOutputSchema(0, getInputSchema(0))    }

    override def processTuple(port:Int, t:Tuple) = {        actor {            val times = t.getInt("times")            val delays = t.getLong("delays")            for (i <- 0 until times) {                Thread.sleep(delays)                coordinator ! t             }        }    }

    override def init = { coordinator.start }     override def shutdown = { coordinator ! None }}

Don't
forget to edit the input schema for your app to include an int field
named "times" and a long field named "delays".  Now we can run the app,
enqueue tuples, and watch the delayed tuples arriving out of order.  We
could have simply called "sendOutput" from the actor in "processTuple",
but I wanted to include actors sending messages to other actors instead
of just being glorified threads.

All in all, really solid work on the part of the Scala developers.  I am definitely looking forward to trying Scala on some larger projects.  Hopefully, I will be able share more as I learn it.

(Matt Matt FowlesFowles is a Senior Software Engineer for StreamBase, whose interests include programming language, compiler, and virtual machine design.)

 

Leave a Reply

Your email address will not be published. Required fields are marked *

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>