[ap4r-devel] [248] trunk/ap4r: Added PostgreSQL support.
kato-k at rubyforge.org
kato-k at rubyforge.org
Mon Aug 6 05:33:31 EDT 2007
Revision: 248
Author: kato-k
Date: 2007-08-06 05:33:28 -0400 (Mon, 06 Aug 2007)
Log Message:
-----------
Added PostgreSQL support.
Modified Paths:
--------------
trunk/ap4r/History.txt
trunk/ap4r/lib/ap4r/message_store_ext.rb
Added Paths:
-----------
trunk/ap4r/config/queues_pgsql.cfg
trunk/ap4r/lib/ap4r/postgresql.sql
Modified: trunk/ap4r/History.txt
===================================================================
--- trunk/ap4r/History.txt 2007-08-02 00:39:29 UTC (rev 247)
+++ trunk/ap4r/History.txt 2007-08-06 09:33:28 UTC (rev 248)
@@ -1,7 +1,8 @@
== 0.3.x
=== 0.3.3 (June ?, 2007)
-* Added: support with hoe.
+* Added: Support with hoe.
+* Added: Support PostgreSQL as reliable RDBMS persistence.
=== 0.3.2 (June 7th, 2007)
* Fixed: util/loc.rb doesn't work.
Added: trunk/ap4r/config/queues_pgsql.cfg
===================================================================
--- trunk/ap4r/config/queues_pgsql.cfg (rev 0)
+++ trunk/ap4r/config/queues_pgsql.cfg 2007-08-06 09:33:28 UTC (rev 248)
@@ -0,0 +1,19 @@
+---
+store:
+ type: postgresql
+ uri: # default is tcp://localhost:5432
+ database: ap4r
+ username: ap4r
+ password: ap4r
+drb:
+ host:
+ port: 6438
+ acl: allow 127.0.0.1 allow ::1 allow 10.0.0.0/8
+dispatchers:
+ -
+ targets: queue.*
+ threads: 1
+#carriers:
+# -
+# source_uri: druby://another.host.local:6438
+# threads: 1
Modified: trunk/ap4r/lib/ap4r/message_store_ext.rb
===================================================================
--- trunk/ap4r/lib/ap4r/message_store_ext.rb 2007-08-02 00:39:29 UTC (rev 247)
+++ trunk/ap4r/lib/ap4r/message_store_ext.rb 2007-08-06 09:33:28 UTC (rev 248)
@@ -33,6 +33,175 @@
end
+ begin
+
+ # Make sure we have a MySQL library before creating this class,
+ # worst case we end up with a disk-based message store. Try the
+ # native MySQL library, followed by the Rails MySQL library.
+ begin
+ require 'postgres-pr/connection'
+ rescue LoadError
+ puts 'hoge'
+ end
+
+ class PostgreSQL < Base #:nodoc:
+
+ TYPE = self.name.split('::').last.downcase
+
+ @@stores[TYPE] = self
+
+ # Default prefix for tables in the database.
+ DEFAULT_PREFIX = 'reliable_msg_';
+
+ # Reference to an open PostgreSQL connection held in the current thread.
+ THREAD_CURRENT_PGSQL = :reliable_msg_pgsql #:nodoc:
+
+ def initialize config, logger
+ super logger
+ @config = { :host=>config['host'], :username=>config['username'], :password=>config['password'],
+ :database=>config['database'], :port=>config['port'], :socket=>config['socket'] }
+ @prefix = config['prefix'] || DEFAULT_PREFIX
+ @queues_table = "#{@prefix}queues"
+ @topics_table = "#{@prefix}topics"
+ end
+
+ def type
+ TYPE
+ end
+
+ def setup
+ pgsql = connection
+ requires = 2 # Number of tables used by reliable-msg.
+ pgsql.query "\dt" do |result|
+ while row = result.fetch_row
+ requires -= 1 if row[0] == @queues_table || row[0] == @topics_table
+ end
+ end
+ if requires > 0
+ sql = File.open File.join(File.dirname(__FILE__), "postgresql.sql"), "r" do |input|
+ input.readlines.join
+ end
+ sql.gsub! DEFAULT_PREFIX, @prefix
+ pgsql.query sql
+ true
+ end
+ end
+
+
+ def configuration
+ config = { "type"=>TYPE, "host"=>@config[:host], "username"=>@config[:username],
+ "password"=>@config[:password], "database"=>@config[:database] }
+ config["port"] = @config[:port] if @config[:port]
+ config["socket"] = @config[:socket] if @config[:socket]
+ config["prefix"] = @config[:prefix] if @config[:prefix]
+ config
+ end
+
+
+ def activate
+ super
+ load_index
+ end
+
+
+ def deactivate
+ Thread.list.each do |thread|
+ if conn = thread[THREAD_CURRENT_PGSQL]
+ thread[THREAD_CURRENT_PGSQL] = nil
+ conn.close
+ end
+ end
+ super
+ end
+
+
+ protected
+
+ def update inserts, deletes, dlqs
+ pgsql = connection
+ pgsql.query "BEGIN"
+ begin
+ inserts.each do |insert|
+ if insert[:queue]
+ pgsql.query "INSERT INTO #{@queues_table} (id,queue,headers,object) VALUES('#{connection.quote insert[:id]}','#{connection.quote insert[:queue]}','#{connection.quote Marshal::dump(insert[:headers])}','#{connection.quote insert[:message]}')"
+# pgsql.query "INSERT INTO #{@queues_table} (id,queue,headers,object) VALUES('#{connection.quote insert[:id]}','#{connection.quote insert[:queue]}',BINARY '#{connection.quote Marshal::dump(insert[:headers])}',BINARY '#{connection.quote insert[:message]}')"
+ else
+ pgsql.query "REPLACE #{@topics_table} (topic,headers,object) VALUES('#{connection.quote insert[:topic]}','#{connection.quote Marshal::dump(insert[:headers])}','#{connection.quote insert[:message]}')"
+ end
+ end
+ ids = deletes.inject([]) do |array, delete|
+ delete[:queue] ? array << "'#{delete[:id]}'" : array
+ end
+ if !ids.empty?
+ pgsql.query "DELETE FROM #{@queues_table} WHERE id IN (#{ids.join ','})"
+ end
+ dlqs.each do |dlq|
+ pgsql.query "UPDATE #{@queues_table} SET queue='#{Queue::DLQ}' WHERE id='#{connection.quote dlq[:id]}'"
+ end
+ pgsql.query "COMMIT"
+ rescue Exception=>error
+ pgsql.query "ROLLBACK"
+ raise error
+ end
+ super
+ end
+
+
+ def load_index
+ connection.query "SELECT id,queue,headers FROM #{@queues_table}" do |result|
+ while row = result.fetch_row
+ queue = @queues[row[1]] ||= []
+ headers = Marshal::load row[2]
+ # Add element based on priority, higher priority comes first.
+ priority = headers[:priority]
+ added = false
+ queue.each_index do |idx|
+ if queue[idx][:priority] < priority
+ queue[idx, 0] = headers
+ added = true
+ break
+ end
+ end
+ queue << headers unless added
+ end
+ end
+ connection.query "SELECT topic,headers FROM #{@topics_table}" do |result|
+ while row = result.fetch_row
+ @topics[row[0]] = Marshal::load row[1]
+ end
+ end
+ end
+
+
+ def load id, type, queue_or_topic
+ message = nil
+ if type == :queue
+ connection.query "SELECT object FROM #{@queues_table} WHERE id='#{id}'" do |result|
+ message = if row = result.fetch_row
+ row[0]
+ end
+ end
+ else
+ connection.query "SELECT object FROM #{@topics_table} WHERE topic='#{queue_or_topic}'" do |result|
+ message = if row = result.fetch_row
+ row[0]
+ end
+ end
+ end
+ message
+ end
+
+ def connection
+# Thread.current[THREAD_CURRENT_PGSQL] ||= ::PostgresPR::Connection.new @config[:host], @config[:username], @config[:password], at config[:database], @config[:port], @config[:socket]
+ Thread.current[THREAD_CURRENT_PGSQL] ||= ::PostgresPR::Connection.new @config[:database], @config[:username], @config[:password], @config[:uri]
+ end
+
+ end
+
+ rescue LoadError
+ # do nothing
+ end
+
end
end
@@ -57,3 +226,21 @@
end
end
end
+
+
+module PostgresPR
+ class Connection
+ def escape_string(str)
+ str.gsub(/([\0\n\r\032\'\"\\])/) do
+ case $1
+ when "\0" then "\\0"
+ when "\n" then "\\n"
+ when "\r" then "\\r"
+ when "\032" then "\\Z"
+ else "\\"+$1
+ end
+ end
+ end
+ alias :quote :escape_string
+ end
+end
Added: trunk/ap4r/lib/ap4r/postgresql.sql
===================================================================
--- trunk/ap4r/lib/ap4r/postgresql.sql (rev 0)
+++ trunk/ap4r/lib/ap4r/postgresql.sql 2007-08-06 09:33:28 UTC (rev 248)
@@ -0,0 +1,13 @@
+CREATE TABLE reliable_msg_queues (
+ id character varying(255) NOT NULL default '',
+ queue character varying(255) NOT NULL default '',
+ headers text NOT NULL,
+ object bytea NOT NULL,
+ PRIMARY KEY (id)
+)
+CREATE TABLE reliable_msg_topics (
+ topic character varying(255) NOT NULL default '',
+ headers text NOT NULL,
+ object bytea NOT NULL,
+ PRIMARY KEY (topic)
+)
More information about the ap4r-devel
mailing list