[ap4r-devel] [280] trunk/ap4r/lib/ap4r: Fixed: fixed a bug that remaining messages in PostgreSQL are not loaded when AP4R restarts.
kato-k at rubyforge.org
kato-k at rubyforge.org
Thu Aug 30 01:49:18 EDT 2007
Revision: 280
Author: kato-k
Date: 2007-08-30 01:49:17 -0400 (Thu, 30 Aug 2007)
Log Message:
-----------
Fixed: fixed a bug that remaining messages in PostgreSQL are not loaded when AP4R restarts.
Modified Paths:
--------------
trunk/ap4r/lib/ap4r/message_store_ext.rb
trunk/ap4r/lib/ap4r/postgresql.sql
Modified: trunk/ap4r/lib/ap4r/message_store_ext.rb
===================================================================
--- trunk/ap4r/lib/ap4r/message_store_ext.rb 2007-08-30 03:59:01 UTC (rev 279)
+++ trunk/ap4r/lib/ap4r/message_store_ext.rb 2007-08-30 05:49:17 UTC (rev 280)
@@ -38,7 +38,7 @@
# Make sure we have a PostgreSQL library before creating this class,
# worst case we end up with a disk-based message store. Try the
# native PostgreSQL library, followed by the pure Ruby PostgreSQL library.
- begin
+ begin
require 'postgres'
rescue LoadError
require 'postgres-pr/connection'
@@ -123,10 +123,9 @@
begin
inserts.each do |insert|
if insert[:queue]
- pgsql.query "INSERT INTO #{@queues_table} (id,queue,headers,object) VALUES('#{connection.quote insert[:id]}','#{connection.quote insert[:queue]}','#{connection.quote Marshal::dump(insert[:headers])}','#{connection.quote insert[:message]}')"
-# pgsql.query "INSERT INTO #{@queues_table} (id,queue,headers,object) VALUES('#{connection.quote insert[:id]}','#{connection.quote insert[:queue]}',BINARY '#{connection.quote Marshal::dump(insert[:headers])}',BINARY '#{connection.quote insert[:message]}')"
+ pgsql.query "INSERT INTO #{@queues_table} (id,queue,headers,object) VALUES('#{connection.quote insert[:id]}', '#{connection.quote insert[:queue]}', '#{connection.quote YAML.dump(insert[:headers])}', '#{connection.quote insert[:message]}')"
else
- pgsql.query "REPLACE #{@topics_table} (topic,headers,object) VALUES('#{connection.quote insert[:topic]}','#{connection.quote Marshal::dump(insert[:headers])}','#{connection.quote insert[:message]}')"
+ pgsql.query "REPLACE #{@topics_table} (topic,headers,object) VALUES('#{connection.quote insert[:topic]}','#{connection.quote YAML.dump(insert[:headers])}','#{connection.quote insert[:message]}')"
end
end
ids = deletes.inject([]) do |array, delete|
@@ -149,9 +148,9 @@
def load_index
connection.query "SELECT id,queue,headers FROM #{@queues_table}" do |result|
- while row = result.fetch_row
- queue = @queues[row[1]] ||= []
- headers = Marshal::load row[2]
+ result.each do |tuple|
+ queue = @queues[tuple[1]] ||= []
+ headers = YAML.load tuple[2]
# Add element based on priority, higher priority comes first.
priority = headers[:priority]
added = false
@@ -166,8 +165,8 @@
end
end
connection.query "SELECT topic,headers FROM #{@topics_table}" do |result|
- while row = result.fetch_row
- @topics[row[0]] = Marshal::load row[1]
+ result.each do |tuple|
+ @topics[tuple[0]] = YAML.load tuple[1]
end
end
end
@@ -177,15 +176,11 @@
message = nil
if type == :queue
connection.query "SELECT object FROM #{@queues_table} WHERE id='#{id}'" do |result|
- message = if row = result.fetch_row
- row[0]
- end
+ message = result[0][0] if result[0]
end
else
connection.query "SELECT object FROM #{@topics_table} WHERE topic='#{queue_or_topic}'" do |result|
- message = if row = result.fetch_row
- row[0]
- end
+ message = result[0][0] if result[0]
end
end
message
@@ -211,26 +206,61 @@
end
end
-class PGconn
- def quote str
- PGconn.escape str
+if Object.const_defined? :PGError
+ class PGconn
+ alias original_query query
+
+ def query(q, *bind_values, &block)
+ # In PGconn, +query+ method does NOT care about a given block.
+ # To deal with a given block, this method adds iteration
+ # over query results.
+ maybe_result = exec(q, *bind_values)
+ puts "PGconn: query called by #{q}" if $DEBUG
+ puts "PGconn#query returns #{maybe_result}(class: #{maybe_result.class})." if $DEBUG
+ return maybe_result unless block && maybe_result.kind_of?(PGresult)
+ begin
+ puts "PGconn extention: about to yield result." if $DEBUG
+ block.call(maybe_result)
+ ensure
+ maybe_result.clear
+ end
+ end
+
+ def quote str
+ # do nothing
+ str
+ end
+
end
end
-module PostgresPR
- class Connection
- def escape_string(str)
- str.gsub(/([\0\n\r\032\'\"\\])/) do
- case $1
- when "\0" then "\\0"
- when "\n" then "\\n"
- when "\r" then "\\r"
- when "\032" then "\\Z"
- else "\\"+$1
+if Object.const_defined? :PostgresPR
+ module PostgresPR
+ class Connection
+ alias original_query query
+
+ def query(q, &block)
+ # In PostgresPR, +query+ method does NOT care about a given block.
+ # To deal with a given block, this method adds iteration
+ # over query results.
+ maybe_result = original_query(q, &block)
+ puts "PostgresPR: query called by #{q}" if $DEBUG
+ puts "PostgresPR::Connenction#query returns #{maybe_result}(class: #{maybe_result.class})." if $DEBUG
+ return maybe_result.rows unless block && maybe_result.kind_of?(PostgresPR::Connection::Result)
+ begin
+ puts "PostgresPR extention: about to yield result." if $DEBUG
+ block.call(maybe_result.rows)
+ ensure
+ maybe_result = nil
end
end
- end
- alias :quote :escape_string
+
+ def quote(str)
+ # do nothing
+ str
+ end
+
+ end
end
end
Modified: trunk/ap4r/lib/ap4r/postgresql.sql
===================================================================
--- trunk/ap4r/lib/ap4r/postgresql.sql 2007-08-30 03:59:01 UTC (rev 279)
+++ trunk/ap4r/lib/ap4r/postgresql.sql 2007-08-30 05:49:17 UTC (rev 280)
@@ -2,12 +2,12 @@
id character varying(255) NOT NULL default '',
queue character varying(255) NOT NULL default '',
headers text NOT NULL,
- object bytea NOT NULL,
+ object text NOT NULL,
PRIMARY KEY (id)
-)
+);
CREATE TABLE reliable_msg_topics (
topic character varying(255) NOT NULL default '',
headers text NOT NULL,
- object bytea NOT NULL,
+ object text NOT NULL,
PRIMARY KEY (topic)
-)
+);
More information about the ap4r-devel
mailing list