[ap4r-devel] [280] trunk/ap4r/lib/ap4r: Fixed: fixed a bug that remaining messages in PostgreSQL are not loaded when AP4R restarts.

kato-k at rubyforge.org kato-k at rubyforge.org
Thu Aug 30 01:49:18 EDT 2007


Revision: 280
Author:   kato-k
Date:     2007-08-30 01:49:17 -0400 (Thu, 30 Aug 2007)

Log Message:
-----------
Fixed: fixed a bug that remaining messages in PostgreSQL are not loaded when AP4R restarts.

Modified Paths:
--------------
    trunk/ap4r/lib/ap4r/message_store_ext.rb
    trunk/ap4r/lib/ap4r/postgresql.sql

Modified: trunk/ap4r/lib/ap4r/message_store_ext.rb
===================================================================
--- trunk/ap4r/lib/ap4r/message_store_ext.rb	2007-08-30 03:59:01 UTC (rev 279)
+++ trunk/ap4r/lib/ap4r/message_store_ext.rb	2007-08-30 05:49:17 UTC (rev 280)
@@ -38,7 +38,7 @@
       # Make sure we have a PostgreSQL library before creating this class,
       # worst case we end up with a disk-based message store. Try the
       # native PostgreSQL library, followed by the pure Ruby PostgreSQL library.
-      begin 
+      begin
         require 'postgres'
       rescue LoadError
         require 'postgres-pr/connection'
@@ -123,10 +123,9 @@
           begin
             inserts.each do |insert|
               if insert[:queue]
-                pgsql.query "INSERT INTO #{@queues_table} (id,queue,headers,object) VALUES('#{connection.quote  insert[:id]}','#{connection.quote insert[:queue]}','#{connection.quote Marshal::dump(insert[:headers])}','#{connection.quote insert[:message]}')"
-#                pgsql.query "INSERT INTO #{@queues_table} (id,queue,headers,object) VALUES('#{connection.quote  insert[:id]}','#{connection.quote insert[:queue]}',BINARY '#{connection.quote Marshal::dump(insert[:headers])}',BINARY '#{connection.quote insert[:message]}')"
+                pgsql.query "INSERT INTO #{@queues_table} (id,queue,headers,object) VALUES('#{connection.quote  insert[:id]}', '#{connection.quote insert[:queue]}', '#{connection.quote YAML.dump(insert[:headers])}', '#{connection.quote insert[:message]}')"
               else
-                pgsql.query "REPLACE #{@topics_table} (topic,headers,object) VALUES('#{connection.quote insert[:topic]}','#{connection.quote Marshal::dump(insert[:headers])}','#{connection.quote insert[:message]}')"
+                pgsql.query "REPLACE #{@topics_table} (topic,headers,object) VALUES('#{connection.quote insert[:topic]}','#{connection.quote YAML.dump(insert[:headers])}','#{connection.quote insert[:message]}')"
               end
             end
             ids = deletes.inject([]) do |array, delete|
@@ -149,9 +148,9 @@
 
         def load_index
           connection.query "SELECT id,queue,headers FROM #{@queues_table}" do |result|
-            while row = result.fetch_row
-              queue = @queues[row[1]] ||= []
-              headers = Marshal::load row[2]
+            result.each do |tuple|
+              queue = @queues[tuple[1]] ||= []
+              headers = YAML.load tuple[2]
               # Add element based on priority, higher priority comes first.
               priority = headers[:priority]
               added = false
@@ -166,8 +165,8 @@
             end
           end
           connection.query "SELECT topic,headers FROM #{@topics_table}" do |result|
-            while row = result.fetch_row
-              @topics[row[0]] = Marshal::load row[1]
+            result.each do |tuple|
+              @topics[tuple[0]] = YAML.load tuple[1]
             end
           end
         end
@@ -177,15 +176,11 @@
           message = nil
           if type == :queue
             connection.query "SELECT object FROM #{@queues_table} WHERE id='#{id}'" do |result|
-              message = if row = result.fetch_row
-                          row[0]
-                        end
+              message = result[0][0] if result[0]
             end
           else
             connection.query "SELECT object FROM #{@topics_table} WHERE topic='#{queue_or_topic}'" do |result|
-              message = if row = result.fetch_row
-                          row[0]
-                        end
+              message = result[0][0] if result[0]
             end
           end
           message
@@ -211,26 +206,61 @@
   end
 end
 
-class PGconn
-  def quote str
-    PGconn.escape str
+if Object.const_defined? :PGError
+  class PGconn
+    alias original_query query
+
+    def query(q, *bind_values, &block)
+      # In PGconn, +query+ method does NOT care about a given block.
+      # To deal with a given block, this method adds iteration
+      # over query results.
+      maybe_result = exec(q, *bind_values)
+      puts "PGconn: query called by #{q}" if $DEBUG
+      puts "PGconn#query returns #{maybe_result}(class: #{maybe_result.class})." if $DEBUG
+      return maybe_result unless block && maybe_result.kind_of?(PGresult)
+      begin
+        puts "PGconn extention: about to yield result." if $DEBUG
+        block.call(maybe_result)
+      ensure
+        maybe_result.clear
+      end
+    end
+    
+    def quote str
+      # do nothing
+      str
+    end
+
   end
 end
 
-module PostgresPR
-  class Connection
-    def escape_string(str)
-      str.gsub(/([\0\n\r\032\'\"\\])/) do
-        case $1
-        when "\0" then "\\0"
-        when "\n" then "\\n"
-        when "\r" then "\\r"
-        when "\032" then "\\Z"
-        else "\\"+$1
+if Object.const_defined? :PostgresPR
+  module PostgresPR
+    class Connection
+      alias original_query query
+
+      def query(q, &block)
+        # In PostgresPR, +query+ method does NOT care about a given block.
+        # To deal with a given block, this method adds iteration
+        # over query results.
+        maybe_result = original_query(q, &block)
+        puts "PostgresPR: query called by #{q}" if $DEBUG
+        puts "PostgresPR::Connenction#query returns #{maybe_result}(class: #{maybe_result.class})." if $DEBUG
+        return maybe_result.rows unless block && maybe_result.kind_of?(PostgresPR::Connection::Result)
+        begin
+          puts "PostgresPR extention: about to yield result." if $DEBUG
+          block.call(maybe_result.rows)
+        ensure
+          maybe_result = nil
         end
       end
-    end
-    alias :quote :escape_string
+
+      def quote(str)
+        # do nothing
+        str
+      end
+      
+    end   
   end
 end
 

Modified: trunk/ap4r/lib/ap4r/postgresql.sql
===================================================================
--- trunk/ap4r/lib/ap4r/postgresql.sql	2007-08-30 03:59:01 UTC (rev 279)
+++ trunk/ap4r/lib/ap4r/postgresql.sql	2007-08-30 05:49:17 UTC (rev 280)
@@ -2,12 +2,12 @@
   id character varying(255) NOT NULL default '',
   queue character varying(255) NOT NULL default '',
   headers text NOT NULL,
-  object bytea NOT NULL,
+  object text NOT NULL,
   PRIMARY KEY  (id)
-)
+);
 CREATE TABLE reliable_msg_topics (
   topic character varying(255) NOT NULL default '',
   headers text NOT NULL,
-  object bytea NOT NULL,
+  object text NOT NULL,
   PRIMARY KEY  (topic)
-)
+);




More information about the ap4r-devel mailing list