Using XProc with xDB: A Twitter Archive

Introduction

EMC XProc Engine (Calumet) provides seamless integration with xDB, making it possible to use XProc as an effective tool for storing, accessing, and querying content in an XML database. This article uses xDB and XProc together on an example of a real-life application: a Twitter archive.

 

Twitter is a popular social network where users can post short messages, known as tweets or status updates, and share them with other users. Most tweets are public, and Twitter presents them in so-called timelines: for instance, each user has his own “user” timeline containing all of his tweets, and there is also a “public” timeline of all tweets submitted by all Twitter users. Users can also subscribe to other users and view their tweets in an aggregated “friends” timeline.

 

In essence, Twitter can be viewed as a gigantic database of short text messages, with a continuous influx amounting to more than a billion new messages every month. That certainly represents an interesting amount of social data that can be researched and analyzed. Twitter provides APIs for basic access to timelines, looking up user information etc., but unfortunately these are far from suitable for high-volume processing or complex querying of the data. Performing even such simple tasks as searching in a single user's timeline can be quite difficult or tedious.

 

For serious analysis of information stored in the Twitter network, it is often better to transfer the information to an external system and do the processing there. Twitter makes all data available in the form of XML documents, so xDB with its native XML storage and query capabilities is a natural choice for this task.

 

This article presents a library of sample XProc pipelines that show how XProc can be used as an integration tool between the Twitter service and xDB. The pipelines demonstrate use of XProc for retrieving information from Twitter and storing it in xDB. Data in xDB can then be queried using XQuery, or further processed using XProc.

 

This article is intended for XML developers who are interested in learning about interfacing XProc with xDB, and in implementing complex XML processing scenarios in XProc in general. Familiarity with XProc, XQuery, and xDB is beneficial.

 

Prerequisities

To run the sample pipelines, you will need:

 

  • xDB 9.0.4 or higher

     


  • EMC XProc Engine 1.0.11 or higher

     


  • An active Twitter account

     


(The latest versions of xDB and EMC XProc Engine can be obtained from the Documentum XML Software download page.)

 

After installing the tools, create an xDB database called “Twitter” with an Administrator password “secret”. You can use either the xDB Admin tool, or the command-line:

 

     xdb create-database Twitter --adminpwd secret

 

The next step is enabling xDB support in the XProc Engine. A plug-in for the xDB integration is included in the XProc Engine distribution. That plug-in needs to be enabled and configured in an XProc Engine configuration file. The simplest way is to create a file called twitter-config.xml in your working directory with the following content:

 

<xproc-config xmlns="http://www.emc.com/documentum/xml/xproc/config">
  <plugin class="com.emc.documentum.xml.xproc.plugin.xdb.XDBPlugin">
    <init-param name="bootstrap" value="xhive://localhost:1235"/>
    <init-param name="username" value="Administrator"/>
    <init-param name="password" value="secret"/>
    <init-param name="database" value="Twitter"/>
    <init-param name="cache-pages" value="0"/>
    <init-param name="readonly" value="false"/>
    <init-param name="auto-commit" value="true"/>
  </plugin>
</xproc-config>

 

The above configuration tells the XProc Engine to enable the xDB plug-in, and specifies the plug-in initialization parameters, including the xDB bootstrap URL, the database name and the user information. Setting the value of the readonly parameter to false enables read-write xDB access in the pipelines. It also sets auto-commit to true to ensure that xDB transactions will be commited after each successfull pipeline run. If your xDB server is not running on your local machine, or if it uses a different port, you may want to adjust the value of the bootstrap parameter.

 

To test that the xDB plug-in is configured properly, you can use the following command:

 

     calumet -c twitter-config.xml -

 

If everything is set up correctly, the command will block and wait for the XProc pipeline to appear on the standard console input. (You can either provide a pipeline on the standard console input, or simply terminate the process.) If the plug-in fails to initialize (either because it can't connect to the xDB database or because of invalid user credentials), you will get an error message.

 

Twitter API Considerations

Twitter provides a RESTful HTTP API for interfacing the service with external systems. However, the API has two main limitations. First, it is paged, which means that for instance retrieving a long user timeline using one API call is impossible — it must be done in steps, one API request per page, until the complete timeline is read. Second, to prevent abuse of the service and for performance reasons, access to the Twitter API is limited to only a couple of hundreds calls per user (and IP address) per hour. The second limitation is perhaps more severe than the first one, as it makes archiving everything “at once” impossible: after a number of API calls the requests simply start failing.

 

The above suggests that if the Twitter REST API is to be used, archiving is best implemented in an incremental fashion: update the archive periodically, each time picking up where it left off previously, and then updating a little bit more.

 

Another aspect of Twitter API is that many of the API calls require caller authentication. Twitter provides two main ways of authenticating: basic authentication and OAuth authentication. Basic authentication is very simple to use, but it involves sending a Twitter user name and password along with the API call. This poses an obvious security risk, because the user name and password are sent in plain text. OAuth, or Open Authorization, is an open standard that uses a token-based model rather than usernames and passwords to grant access to resources. It is worth noting that Twitter planned to turn off basic authentication support from the API in August 2010 and use OAuth exclusively.

 

For simplicity, the sample pipelines in this article use basic authentication for accessing the Twitter API. OAuth is a more secure communication protocol than basic authentication, but it also harder to implement: multiple requests are necessary to get and validate the security tokens, data encryption needs to be used etc. Implementing OAuth in XProc could be a topic for another article.

 

Twitter Archive XProc Library

The Twitter archive “application” is not implemented as one monolithic XProc pipeline. Instead, it is modeled as a library of smaller, reusable pipelines that focus on individual areas of the functionality: logging in to Twitter, reading a timeline, storing tweets in xDB, etc.

 

The pipelines are in the http://www.example.org/ns/xproc-twitter namespace; the text below uses the namespace prefix “tw” for this namespace.

 

The complete pipeline library, twitter.xpl, is attached to this article. The remainder of this section will discuss the most important pipelines from the library.

 

tw:update-timeline

The “main” pipeline of the Twitter archive application is tw:update-timeline. It has three options:

 

  • user-id (required) — id of the Twitter user which timeline to update

     


  • username (required) — the username to use for authentication against the Twitter API

     


  • password (required) — the password to use for authentication against the Twitter API

     


<p:declare-step type="tw:update-timeline">
  <p:option name="user-id" required="true"/>
  <p:option name="username" required="true"/>
  <p:option name="password" required="true"/>

  <tw:get-last-status-id>
    <p:with-option name="user-id" select="$user-id"/>
  </tw:get-last-status-id>

  <tw:get-timeline>
    <p:with-option name="user-id" select="$user-id"/>
    <p:with-option name="username" select="$username"/>
    <p:with-option name="password" select="$password"/>
    <p:with-option name="since-id" select="/c:result"/>
  </tw:get-timeline>

  <tw:store-timeline/>
</p:declare-step>

 

The pipeline consists of three steps. First, it queries xDB for the id of the latest stored tweet (if any) of the user with id user-id. Then it calls the Twitter API to get all later tweets of that user and finally, it stores the later tweets in xDB. All three steps — tw:get-last-status-id, tw:get-timeline, and tw:store-timeline — are implemented in plain XProc and are discussed below.

 

tw:get-last-status-id

The tw:get-last-status-id pipeline executes an XQuery against xDB to determine the id of the last user's tweet stored in the archive.

 

The pipeline has one option:

 

  • user-id (required) — id of the Twitter user to query

     


<p:declare-step type="tw:get-last-status-id">
  <p:option name="user-id" required="true"/>
  <p:output port="result"/>

  <p:xquery>
    <p:input port="source">
      <p:document href="xhive:/"/>
    </p:input>
    <p:input port="parameters">
      <p:empty/>
    </p:input>
    <p:with-param name="user-id" select="$user-id"/>
    <p:input port="query">
      <p:inline>
        <c:query>
          <![CDATA[
              declare variable $user-id as xs:string external;
              <c:result xmlns:c="http://www.w3.org/ns/xproc-step">
              {
              max((1, if (doc-available($user-id))
                      then (for $id in doc($user-id)/status/id return $id cast as xs:integer)
                      else ()))
              }
              </c:result>
          ]]>
        </c:query>
      </p:inline>
    </p:input>
  </p:xquery>
</p:declare-step>

 

The pipeline shows that the xDB integration is indeed transparent in XProc Engine: no ad-hoc extension XProc steps are used. The only xDB-specific piece is the URI xhive:/ used as the context item for executing the XQuery. The xhive URI scheme tells the engine to interpret the rest of the URI as an absolute path in an xDB database. Typically, the path would point to an XML document, but it can be used to address an xDB library, which is a convenient way to query whole collections of documents. In the source code above, the path string is just a single slash (“/”), which means that the URI points to the database root.

 

The XQuery used in the tw:get-last-status-id pipeline takes a user id (the user-id external variable) and returns the id of the most recent tweet in the database for that particular user. If no tweet is found, the query returns the value 1. To make sure that the result of the XQuery can be further processed in XProc, the XQuery wraps the result in an XML element.

 

The XQuery assumes that the tweets are partitioned in libraries based on user ids in the xDB database — see the description of the tw:store-status pipeline for more details.

 

tw:get-timeline

The tw:get-timeline pipeline represents the core of the Twitter archive library: it calls the Twitter API to get the timeline of a Twitter user and returns it as a sequence of XML documents (one per each tweet). It is also one of the longest and most complex pipelines in the library as it needs to deal with paging of the results.

 

The pipeline has five options:

 

  • user-id (required) — id of the Twitter user which timeline to return

     


  • username (required) — the username to use for authentication against the Twitter API

     


  • password (required) — the password to use for authentication against the Twitter API

     


  • max-id (optional) — the maximum tweet id to return

     


  • since-id (optional) — the minimum tweet id to return

     


The pipeline returns all tweets whose id is between since-id and max-id. If max-id is unspecified, tweets until the most recent one will be returned. Because of paging, the pipeline is implemented recursively: it fetches a window of 200 tweets and if there are more, it calls itself to fetch the rest.

 

<p:declare-step type="tw:get-timeline">
  <p:option name="user-id" required="false"/>
  <p:option name="username" required="true"/>
  <p:option name="password" required="true"/>
  <p:option name="max-id" required="false"/>
  <p:option name="since-id" select="1"/>
  <p:output port="result" sequence="true"/>

  <p:variable name="url-prefix" select="concat('http://api.twitter.com/1/statuses/user_timeline.xml?id=',$user-id,'&')"/>

  <p:group name="get-start">
    <p:output port="result"/>

    <tw:get-authentication-request>
      <p:with-option name="username" select="$username"/>
      <p:with-option name="password" select="$password"/>
    </tw:get-authentication-request>
  
    <p:choose>
      <p:when test="p:value-available('max-id')">
        <p:add-attribute match="/c:request" attribute-name="href">
          <p:with-option name="attribute-value"
                         select="concat($url-prefix,'count=200&amp;since_id=',$since-id,'&amp;max_id=',$max-id)"/>
        </p:add-attribute>
      </p:when>
      <p:otherwise>
        <p:add-attribute match="/c:request" attribute-name="href">
          <p:with-option name="attribute-value"
                         select="concat($url-prefix,'count=200&amp;since_id=',$since-id)"/>
        </p:add-attribute>
      </p:otherwise>
    </p:choose>
  
    <p:http-request/>
  </p:group>
  
  <p:group name="get-rest">
    <p:output port="result" sequence="true"/>

    <p:choose>
      <p:when test="/statuses/status">
        <p:output port="result" sequence="true"/>
        <tw:get-timeline>
          <p:with-option name="username" select="$username"/>
          <p:with-option name="password" select="$password"/>
          <p:with-option name="user-id" select="$user-id"/>
          <p:with-option name="max-id" select="/statuses/status[last()]/id - 1"/>
          <p:with-option name="since-id" select="$since-id"/>
        </tw:get-timeline>
      </p:when>
      <p:otherwise>
        <p:output port="result" sequence="true"/>
        <p:identity>
          <p:input port="source">
            <p:empty/>
          </p:input>
        </p:identity>
      </p:otherwise>
    </p:choose>
  </p:group>

  <p:identity name="select-start-statuses">
    <p:input port="source" select="/statuses/status">
      <p:pipe step="get-start" port="result"/>
    </p:input>
  </p:identity>

  <p:identity>
    <p:input port="source">
      <p:pipe step="select-start-statuses" port="result"/>
      <p:pipe step="get-rest" port="result"/>
    </p:input>
  </p:identity>
</p:declare-step>

tw:store-timeline

The tw:store-timeline pipeline takes a sequence of XML documents representing a collection of tweets and stores them in xDB. The pipeline is a simple loop that calls the tw:store-status pipeline for each document.

 

<p:declare-step type="tw:store-timeline">
  <p:input port="source" sequence="true"/>

  <p:for-each name="for">
    <p:iteration-source select="/status"/>
    <tw:store-status/>
  </p:for-each>
</p:declare-step>

tw:store-status

The tw:store-status pipeline takes an XML document representing a single tweet and stores it in xDB.

 

<p:declare-step type="tw:store-status">
  <p:input port="source"/>
  <p:output port="result" primary="false">
    <p:pipe step="store" port="result"/>
  </p:output>

  <p:variable name="username" select="/status/user/screen_name"/>
  <p:variable name="user-id" select="/status/user/id"/>
  <p:variable name="status-id" select="/status/id"/>
  <p:variable name="status-doc-uri"
              select="concat('xhive:/',$user-id,'/',$status-id,'.xml')"/>

  <p:store name="store">
    <p:with-option name="href" select="$status-doc-uri"/>
  </p:store>
</p:declare-step>

 

The pipeline uses the standard XProc p:store step for storing the document in xDB. The tweets are partitioned based on user ids in xDB: each user id has its own library where its tweets are stored.

 

Figure 1. xDB Library Structure

xdb-twitter.jpg

tw:get-friends

The tw:get-friends pipeline can be used for getting a list of friends of a Twitter user. Like tw:get-timeline, the pipeline is recursive to deal with paging limits of the Twitter API.

 

The pipeline has four options:

 

  • user-id (required) — id of the Twitter user which friends to return

     


  • username (required) — the username to use for authentication against the Twitter API

     


  • password (required) — the password to use for authentication against the Twitter API

     


  • cursor (optional) — identifier of the friends page to return

     


<p:declare-step type="tw:get-friends">
  <p:option name="user-id" required="true"/>
  <p:option name="username" required="true"/>
  <p:option name="password" required="true"/>
  <p:option name="cursor" select="-1"/>
  <p:output port="result" sequence="true"/>

  <p:group name="get-start">
    <p:output port="result"/>

    <tw:get-authentication-request>
      <p:with-option name="username" select="$username"/>
      <p:with-option name="password" select="$password"/>
    </tw:get-authentication-request>
  
    <p:add-attribute match="/c:request" attribute-name="href">
      <p:with-option name="attribute-value"
                     select="concat('http://api.twitter.com/1/friends/ids.xml?id=',$user-id,'&cursor=',$cursor)"/>
    </p:add-attribute>
  
    <p:http-request/>
  </p:group>

  <p:group name="get-rest">
    <p:output port="result" sequence="true"/>

    <p:choose>
      <p:when test="/id_list/next_cursor != '0'">
        <p:output port="result" sequence="true"/>
        <tw:get-friends>
          <p:with-option name="username" select="$username"/>
          <p:with-option name="password" select="$password"/>
          <p:with-option name="user-id" select="$user-id"/>
          <p:with-option name="cursor" select="/id_list/next_cursor"/>
        </tw:get-friends>
      </p:when>
      <p:otherwise>
        <p:output port="result" sequence="true"/>
        <p:identity>
          <p:input port="source">
            <p:empty/>
          </p:input>
        </p:identity>
      </p:otherwise>
    </p:choose>
  </p:group>

  <p:for-each name="select-start-users">
    <p:output port="result"/>
    <p:iteration-source select="/id_list/ids/id">
      <p:pipe step="get-start" port="result"/>
    </p:iteration-source>
  
    <p:string-replace match="/c:result/text()">
      <p:with-option name="replace" select="/id"/>
      <p:input port="source">
        <p:inline>
          <c:result>foo</c:result>
        </p:inline>
      </p:input>
    </p:string-replace>
  </p:for-each>

  <p:identity>
    <p:input port="source">
      <p:pipe step="select-start-users" port="result"/>
      <p:pipe step="get-rest" port="result"/>
    </p:input>
  </p:identity>

</p:declare-step>

Using the Library

The Twitter Archive XProc library can be imported in other XProc pipelines or libraries and used in various ways; this section presents a number of examples.

 

Archiving Tweets of a Single User

The example below shows a pipeline that updates the archive for a single user with id user-id:

 


<p:declare-step xmlns:tw="http://www.example.org/ns/xproc-twitter"
                version="1.0">

  <p:import href="twitter.xpl"/>

  <p:option name="user-id" required="true"/>
  <p:option name="username" required="true"/>
  <p:option name="password" required="true"/>

  <tw:update-timeline>
    <p:with-option name="user-id" select="$user-id"/>
    <p:with-option name="username" select="$username"/>
    <p:with-option name="password" select="$password"/>
  </tw:update-timeline>
</p:declare-step>

 

Save the pipeline above to a file, for example update-user.xpl. The command below shows how to execute the pipeline to update the timeline of the Twitter user with id 20510471 (vojtechtoman):

 

calumet -c twitter-config.xml update-user.xpl user-id=20510471 username=... password=...

 

For continuous, automated archiving, you may want to execute the above command periodically, for instance by scheduling a cron job.

 

Archiving Tweets of Multiple Users

A slightly more advanced use of the library would be to not only update the timeline of a single user, but also the timelines of his friends. The pipeline below is an extension of the previous pipeline that first updates the timeline of a user with id user-id and then updates the timelines of five randomly selected friends. (While it would be technically easier to simply update the timelines of all friends, such a solution could very easily result in too many calls to the Twitter API within the one hour limit and subsequent failure of the pipeline.)

 

<p:declare-step xmlns:p="http://www.w3.org/ns/xproc"
                xmlns:c="http://www.w3.org/ns/xproc-step"
                xmlns:tw="http://www.example.org/ns/xproc-twitter"
                version="1.0">

  <p:import href="twitter.xpl"/>

  <p:option name="user-id" required="true"/>
  <p:option name="username" required="true"/>
  <p:option name="password" required="true"/>

  <tw:update-timeline>
    <p:with-option name="user-id" select="$user-id"/>
    <p:with-option name="username" select="$username"/>
    <p:with-option name="password" select="$password"/>
  </tw:update-timeline>

  <tw:get-friends>
    <p:with-option name="user-id" select="$user-id"/>
    <p:with-option name="username" select="$username"/>
    <p:with-option name="password" select="$password"/>
  </tw:get-friends>

  <p:xquery>
    <p:input port="query">
      <p:inline>
        <c:query>
          <![CDATA[
               import module namespace rnd = "java:java.util.Random";

               declare function local:swap($n as xs:integer, $seq) {
                 if ($n le 0) then
                   $seq
                 else
                   let $random := rnd:new()
                   let $len := count($seq) cast as xs:int
                   let $r1 := rnd:nextInt($random, $len) + 1
                   let $r2 := rnd:nextInt($random, $len) + 1
                   let $i1 := min(($r1, $r2))
                   let $i2 := max(($r1, $r2))
                   return if ($i1 = $i2) then
                   local:swap($n - 1, $seq)
                 else
                   local:swap($n - 1,
                     (subsequence($seq, 1, $i1 - 1),
                      $seq[$i2],
                      subsequence($seq, $i1 + 1, $i2 - $i1 - 1),
                      $seq[$i1],
                      subsequence($seq, $i2 +1)))
                 };

                 declare function local:shuffle($seq) {
                   local:swap(count($seq), $seq)
                 };

                 subsequence(local:shuffle(collection()), 1, 5)
          ]]>
        </c:query>
      </p:inline>
    </p:input>
    <p:input port="parameters">
      <p:empty/>
    </p:input>
  </p:xquery>

  <p:for-each>
    <p:variable name="friend-id" select="/c:result"/>
    <tw:update-timeline>
      <p:with-option name="user-id" select="$friend-id"/>
      <p:with-option name="username" select="$username"/>
      <p:with-option name="password" select="$password"/>
    </tw:update-timeline>
  </p:for-each>
</p:declare-step>

 

This pipeline is notable for its use of XQuery for selecting the five random friends of the user. Standard XProc or XQuery don't support random number generation, but in xDB's XQuery implementation, you can overcome this limitation by, for instance, importing the Java java.util.Random class in your XQuery (as is the case above). An alternative would be to implement an XProc extension step that supports random numbers; however, that would require registering an additional plug-in with the XProc Engine.

 

Going Further: Exploring the Archive

The two example pipelines above were just two possible ways of getting tweets into xDB. Next, you may find it interesting to query and potentially further process the information stored in the archive. To get you started, the (almost trivial) example below shows a pipeline that returns all stored tweets that mention both “xdb” and “xproc”, ordered by their id:

 

<p:declare-step xmlns:p="http://www.w3.org/ns/xproc"
                xmlns:c="http://www.w3.org/ns/xproc-step"
                xmlns:tw="http://www.example.org/ns/xproc-twitter"
                version="1.0">

  <p:output port="result" sequence="true"/>

  <p:xquery>
    <p:input port="source">
      <p:document href="xhive:/"/>
    </p:input>
    <p:input port="query">
      <p:inline>
        <c:query>
          <![CDATA[
               for $status in /status[text ftcontains "xdb" ftand "xproc"]
               order by $status/id cast as xs:integer
               return $status
          ]]>
        </c:query>
      </p:inline>
    </p:input>
    <p:input port="parameters">
      <p:empty/>
    </p:input>
  </p:xquery>
</p:declare-step>

Summary

XProc is a powerful language for XML processing and XML application integration. The sample XProc pipelines in this article show how XProc can be used with xDB to create a simple, but potentially powerful Twitter archiving application.