[ap4r-devel] [248] trunk/ap4r: Added PostgreSQL support.

kato-k at rubyforge.org kato-k at rubyforge.org
Mon Aug 6 05:33:31 EDT 2007


Revision: 248
Author:   kato-k
Date:     2007-08-06 05:33:28 -0400 (Mon, 06 Aug 2007)

Log Message:
-----------
Added PostgreSQL support.

Modified Paths:
--------------
    trunk/ap4r/History.txt
    trunk/ap4r/lib/ap4r/message_store_ext.rb

Added Paths:
-----------
    trunk/ap4r/config/queues_pgsql.cfg
    trunk/ap4r/lib/ap4r/postgresql.sql

Modified: trunk/ap4r/History.txt
===================================================================
--- trunk/ap4r/History.txt	2007-08-02 00:39:29 UTC (rev 247)
+++ trunk/ap4r/History.txt	2007-08-06 09:33:28 UTC (rev 248)
@@ -1,7 +1,8 @@
 == 0.3.x
 
 === 0.3.3 (June ?, 2007)
-* Added: support with hoe.
+* Added: Support with hoe.
+* Added: Support PostgreSQL as reliable RDBMS persistence.
 
 === 0.3.2 (June 7th, 2007)
 * Fixed: util/loc.rb doesn't work.

Added: trunk/ap4r/config/queues_pgsql.cfg
===================================================================
--- trunk/ap4r/config/queues_pgsql.cfg	                        (rev 0)
+++ trunk/ap4r/config/queues_pgsql.cfg	2007-08-06 09:33:28 UTC (rev 248)
@@ -0,0 +1,19 @@
+--- 
+store: 
+  type: postgresql
+  uri: # default is tcp://localhost:5432
+  database: ap4r
+  username: ap4r
+  password: ap4r
+drb: 
+  host: 
+  port: 6438
+  acl: allow 127.0.0.1 allow ::1 allow 10.0.0.0/8
+dispatchers:
+  -
+    targets: queue.*
+    threads: 1
+#carriers:
+#  - 
+#    source_uri: druby://another.host.local:6438
+#    threads: 1

Modified: trunk/ap4r/lib/ap4r/message_store_ext.rb
===================================================================
--- trunk/ap4r/lib/ap4r/message_store_ext.rb	2007-08-02 00:39:29 UTC (rev 247)
+++ trunk/ap4r/lib/ap4r/message_store_ext.rb	2007-08-06 09:33:28 UTC (rev 248)
@@ -33,6 +33,175 @@
 
     end
 
+    begin
+
+      # Make sure we have a MySQL library before creating this class,
+      # worst case we end up with a disk-based message store. Try the
+      # native MySQL library, followed by the Rails MySQL library.
+      begin
+        require 'postgres-pr/connection'
+      rescue LoadError
+        puts 'hoge'
+      end
+
+      class PostgreSQL < Base #:nodoc:
+
+        TYPE = self.name.split('::').last.downcase
+
+        @@stores[TYPE] = self
+
+        # Default prefix for tables in the database.
+        DEFAULT_PREFIX = 'reliable_msg_';
+
+        # Reference to an open PostgreSQL connection held in the current thread.
+        THREAD_CURRENT_PGSQL = :reliable_msg_pgsql #:nodoc:
+
+        def initialize config, logger
+          super logger
+          @config = { :host=>config['host'], :username=>config['username'], :password=>config['password'],
+            :database=>config['database'], :port=>config['port'], :socket=>config['socket'] }
+          @prefix = config['prefix'] || DEFAULT_PREFIX
+          @queues_table = "#{@prefix}queues"
+          @topics_table = "#{@prefix}topics"
+        end
+
+        def type
+          TYPE
+        end
+
+        def setup
+          pgsql = connection
+          requires = 2 # Number of tables used by reliable-msg.
+          pgsql.query "\dt" do |result|
+            while row = result.fetch_row
+              requires -= 1 if row[0] == @queues_table || row[0] == @topics_table
+            end
+          end
+          if requires > 0
+            sql = File.open File.join(File.dirname(__FILE__), "postgresql.sql"), "r" do |input|
+              input.readlines.join
+            end
+            sql.gsub! DEFAULT_PREFIX, @prefix
+            pgsql.query sql
+            true
+          end
+        end
+
+
+        def configuration
+          config = { "type"=>TYPE, "host"=>@config[:host], "username"=>@config[:username],
+            "password"=>@config[:password], "database"=>@config[:database] }
+          config["port"] = @config[:port] if @config[:port]
+          config["socket"] = @config[:socket] if @config[:socket]
+          config["prefix"] = @config[:prefix] if @config[:prefix]
+          config
+        end
+
+
+        def activate
+          super
+          load_index
+        end
+
+
+        def deactivate
+          Thread.list.each do |thread|
+            if conn = thread[THREAD_CURRENT_PGSQL]
+              thread[THREAD_CURRENT_PGSQL] = nil
+              conn.close
+            end
+          end
+          super
+        end
+
+
+        protected
+
+        def update inserts, deletes, dlqs
+          pgsql = connection
+          pgsql.query "BEGIN"
+          begin
+            inserts.each do |insert|
+              if insert[:queue]
+                pgsql.query "INSERT INTO #{@queues_table} (id,queue,headers,object) VALUES('#{connection.quote  insert[:id]}','#{connection.quote insert[:queue]}','#{connection.quote Marshal::dump(insert[:headers])}','#{connection.quote insert[:message]}')"
+#                pgsql.query "INSERT INTO #{@queues_table} (id,queue,headers,object) VALUES('#{connection.quote  insert[:id]}','#{connection.quote insert[:queue]}',BINARY '#{connection.quote Marshal::dump(insert[:headers])}',BINARY '#{connection.quote insert[:message]}')"
+              else
+                pgsql.query "REPLACE #{@topics_table} (topic,headers,object) VALUES('#{connection.quote insert[:topic]}','#{connection.quote Marshal::dump(insert[:headers])}','#{connection.quote insert[:message]}')"
+              end
+            end
+            ids = deletes.inject([]) do |array, delete|
+              delete[:queue] ? array << "'#{delete[:id]}'" : array
+            end
+            if !ids.empty?
+              pgsql.query "DELETE FROM #{@queues_table} WHERE id IN (#{ids.join ','})"
+            end
+            dlqs.each do |dlq|
+              pgsql.query "UPDATE #{@queues_table} SET queue='#{Queue::DLQ}' WHERE id='#{connection.quote dlq[:id]}'"
+            end
+            pgsql.query "COMMIT"
+          rescue Exception=>error
+            pgsql.query "ROLLBACK"
+            raise error
+          end
+          super
+        end
+
+
+        def load_index
+          connection.query "SELECT id,queue,headers FROM #{@queues_table}" do |result|
+            while row = result.fetch_row
+              queue = @queues[row[1]] ||= []
+              headers = Marshal::load row[2]
+              # Add element based on priority, higher priority comes first.
+              priority = headers[:priority]
+              added = false
+              queue.each_index do |idx|
+                if queue[idx][:priority] < priority
+                  queue[idx, 0] = headers
+                  added = true
+                  break
+                end
+              end
+              queue << headers unless added
+            end
+          end
+          connection.query "SELECT topic,headers FROM #{@topics_table}" do |result|
+            while row = result.fetch_row
+              @topics[row[0]] = Marshal::load row[1]
+            end
+          end
+        end
+
+
+        def load id, type, queue_or_topic
+          message = nil
+          if type == :queue
+            connection.query "SELECT object FROM #{@queues_table} WHERE id='#{id}'" do |result|
+              message = if row = result.fetch_row
+                          row[0]
+                        end
+            end
+          else
+            connection.query "SELECT object FROM #{@topics_table} WHERE topic='#{queue_or_topic}'" do |result|
+              message = if row = result.fetch_row
+                          row[0]
+                        end
+            end
+          end
+          message
+        end
+
+        def connection
+#          Thread.current[THREAD_CURRENT_PGSQL] ||= ::PostgresPR::Connection.new @config[:host], @config[:username], @config[:password], at config[:database], @config[:port], @config[:socket]
+          Thread.current[THREAD_CURRENT_PGSQL] ||= ::PostgresPR::Connection.new @config[:database], @config[:username], @config[:password], @config[:uri]
+        end
+
+      end
+
+    rescue LoadError
+      # do nothing
+    end
+    
   end
 end
 
@@ -57,3 +226,21 @@
     end
   end
 end
+
+
+module PostgresPR
+  class Connection
+    def escape_string(str)
+      str.gsub(/([\0\n\r\032\'\"\\])/) do
+        case $1
+        when "\0" then "\\0"
+        when "\n" then "\\n"
+        when "\r" then "\\r"
+        when "\032" then "\\Z"
+        else "\\"+$1
+        end
+      end
+    end
+    alias :quote :escape_string
+  end
+end

Added: trunk/ap4r/lib/ap4r/postgresql.sql
===================================================================
--- trunk/ap4r/lib/ap4r/postgresql.sql	                        (rev 0)
+++ trunk/ap4r/lib/ap4r/postgresql.sql	2007-08-06 09:33:28 UTC (rev 248)
@@ -0,0 +1,13 @@
+CREATE TABLE reliable_msg_queues (
+  id character varying(255) NOT NULL default '',
+  queue character varying(255) NOT NULL default '',
+  headers text NOT NULL,
+  object bytea NOT NULL,
+  PRIMARY KEY  (id)
+)
+CREATE TABLE reliable_msg_topics (
+  topic character varying(255) NOT NULL default '',
+  headers text NOT NULL,
+  object bytea NOT NULL,
+  PRIMARY KEY  (topic)
+)




More information about the ap4r-devel mailing list