Class MCollective::Client
In: lib/mcollective/client.rb
Parent: Object

Helpers for writing clients that can talk to agents, do discovery and so forth

Methods

Attributes

options  [RW] 
stats  [RW] 

Public Class methods

[Source]

    # File lib/mcollective/client.rb, line 6
 6:     def initialize(configfile)
 7:       @config = Config.instance
 8:       @config.loadconfig(configfile) unless @config.configured
 9: 
10:       @connection = PluginManager["connector_plugin"]
11:       @security = PluginManager["security_plugin"]
12: 
13:       @security.initiated_by = :client
14:       @options = nil
15:       @subscriptions = {}
16: 
17:       @connection.connect
18:     end

Public Instance methods

Returns the configured main collective if no specific collective is specified as options

[Source]

    # File lib/mcollective/client.rb, line 22
22:     def collective
23:       if @options[:collective].nil?
24:         @config.main_collective
25:       else
26:         @options[:collective]
27:       end
28:     end

Disconnects cleanly from the middleware

[Source]

    # File lib/mcollective/client.rb, line 31
31:     def disconnect
32:       Log.debug("Disconnecting from the middleware")
33:       @connection.disconnect
34:     end

Performs a discovery of nodes matching the filter passed returns an array of nodes

An integer limit can be supplied this will have the effect of the discovery being cancelled soon as it reached the requested limit of hosts

[Source]

     # File lib/mcollective/client.rb, line 114
114:     def discover(filter, timeout, limit=0)
115:       raise "Limit has to be an integer" unless limit.is_a?(Fixnum)
116: 
117:       begin
118:         hosts = []
119:         Timeout.timeout(timeout) do
120:           reqid = sendreq("ping", "discovery", filter)
121:           Log.debug("Waiting #{timeout} seconds for discovery replies to request #{reqid}")
122: 
123:           loop do
124:             reply = receive(reqid)
125:             Log.debug("Got discovery reply from #{reply.payload[:senderid]}")
126:             hosts << reply.payload[:senderid]
127: 
128:             return hosts if limit > 0 && hosts.size == limit
129:           end
130:         end
131:       rescue Timeout::Error => e
132:       rescue Exception => e
133:         raise
134:       ensure
135:         unsubscribe("discovery", :reply)
136:       end
137: 
138:       hosts.sort
139:     end

Performs a discovery and then send a request, performs the passed block for each response

   times = discovered_req("status", "mcollectived", options, client) {|resp|
      pp resp
   }

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser

[Source]

     # File lib/mcollective/client.rb, line 201
201:     def discovered_req(body, agent, options=false)
202:       stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
203: 
204:       options = @options unless options
205: 
206:       STDOUT.sync = true
207: 
208:       print("Determining the amount of hosts matching filter for #{options[:disctimeout]} seconds .... ")
209: 
210:       begin
211:         discovered_hosts = discover(options[:filter], options[:disctimeout])
212:         discovered = discovered_hosts.size
213:         hosts_responded = []
214:         hosts_not_responded = discovered_hosts
215: 
216:         stat[:discoverytime] = Time.now.to_f - stat[:starttime]
217: 
218:         puts("#{discovered}\n\n")
219:       rescue Interrupt
220:         puts("Discovery interrupted.")
221:         exit!
222:       end
223: 
224:       raise("No matching clients found") if discovered == 0
225: 
226:       begin
227:         Timeout.timeout(options[:timeout]) do
228:           reqid = sendreq(body, agent, options[:filter])
229: 
230:           (1..discovered).each do |c|
231:             resp = receive(reqid)
232: 
233:             hosts_responded << resp.payload[:senderid]
234:             hosts_not_responded.delete(resp.payload[:senderid]) if hosts_not_responded.include?(resp.payload[:senderid])
235: 
236:             yield(resp.payload)
237:           end
238:         end
239:       rescue Interrupt => e
240:       rescue Timeout::Error => e
241:       end
242: 
243:       stat[:totaltime] = Time.now.to_f - stat[:starttime]
244:       stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
245:       stat[:responses] = hosts_responded.size
246:       stat[:responsesfrom] = hosts_responded
247:       stat[:noresponsefrom] = hosts_not_responded
248:       stat[:discovered] = discovered
249: 
250:       @stats = stat
251:       return stat
252:     end

Prints out the stats returns from req and discovered_req in a nice way

[Source]

     # File lib/mcollective/client.rb, line 255
255:     def display_stats(stats, options=false, caption="stomp call summary")
256:       options = @options unless options
257: 
258:       if options[:verbose]
259:         puts("\n---- #{caption} ----")
260: 
261:         if stats[:discovered]
262:           puts("           Nodes: #{stats[:discovered]} / #{stats[:responses]}")
263:         else
264:           puts("           Nodes: #{stats[:responses]}")
265:         end
266: 
267:         printf("      Start Time: %s\n", Time.at(stats[:starttime]))
268:         printf("  Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
269:         printf("      Agent Time: %.2fms\n", stats[:blocktime] * 1000)
270:         printf("      Total Time: %.2fms\n", stats[:totaltime] * 1000)
271: 
272:       else
273:         if stats[:discovered]
274:           printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
275:         else
276:           printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
277:         end
278:       end
279: 
280:       if stats[:noresponsefrom].size > 0
281:         puts("\nNo response from:\n")
282: 
283:         stats[:noresponsefrom].each do |c|
284:           puts if c % 4 == 1
285:           printf("%30s", c)
286:         end
287: 
288:         puts
289:       end
290:     end

Blocking call that waits for ever for a message to arrive.

If you give it a requestid this means you‘ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.

[Source]

     # File lib/mcollective/client.rb, line 84
 84:     def receive(requestid = nil)
 85:       reply = nil
 86: 
 87:       begin
 88:         reply = @connection.receive
 89:         reply.type = :reply
 90:         reply.expected_msgid = requestid
 91: 
 92:         reply.decode!
 93: 
 94:         reply.payload[:senderid] = Digest::MD5.hexdigest(reply.payload[:senderid]) if ENV.include?("MCOLLECTIVE_ANON")
 95: 
 96:         raise(MsgDoesNotMatchRequestID, "Message reqid #{requestid} does not match our reqid #{reply.requestid}") unless reply.requestid == requestid
 97:       rescue SecurityValidationFailed => e
 98:         Log.warn("Ignoring a message that did not pass security validations")
 99:         retry
100:       rescue MsgDoesNotMatchRequestID => e
101:         Log.debug("Ignoring a message for some other client")
102:         retry
103:       end
104: 
105:       reply
106:     end

Send a request, performs the passed block for each response

times = req("status", "mcollectived", options, client) {|resp|

  pp resp

}

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser

[Source]

     # File lib/mcollective/client.rb, line 149
149:     def req(body, agent=nil, options=false, waitfor=0)
150:       if body.is_a?(Message)
151:         agent = body.agent
152:         options = body.options
153:         waitfor = body.discovered_hosts.size || 0
154:       end
155: 
156:       stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
157: 
158:       options = @options unless options
159: 
160:       STDOUT.sync = true
161: 
162:       hosts_responded = 0
163: 
164:       begin
165:         Timeout.timeout(options[:timeout]) do
166:           reqid = sendreq(body, agent, options[:filter])
167: 
168:           loop do
169:             resp = receive(reqid)
170: 
171:             hosts_responded += 1
172: 
173:             yield(resp.payload)
174: 
175:             break if (waitfor != 0 && hosts_responded >= waitfor)
176:           end
177:         end
178:       rescue Interrupt => e
179:       rescue Timeout::Error => e
180:       ensure
181:         unsubscribe(agent, :reply)
182:       end
183: 
184:       stat[:totaltime] = Time.now.to_f - stat[:starttime]
185:       stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
186:       stat[:responses] = hosts_responded
187:       stat[:noresponsefrom] = []
188: 
189:       @stats = stat
190:       return stat
191:     end

Sends a request and returns the generated request id, doesn‘t wait for responses and doesn‘t execute any passed in code blocks for responses

[Source]

    # File lib/mcollective/client.rb, line 38
38:     def sendreq(msg, agent, filter = {})
39:       if msg.is_a?(Message)
40:         request = msg
41:         agent = request.agent
42:       else
43:         ttl = @options[:ttl] || @config.ttl
44:         request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl})
45:         request.reply_to = @options[:reply_to] if @options[:reply_to]
46:       end
47: 
48:       request.encode!
49: 
50:       Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
51: 
52:       subscribe(agent, :reply)
53: 
54:       request.publish
55: 
56:       request.requestid
57:     end

[Source]

    # File lib/mcollective/client.rb, line 59
59:     def subscribe(agent, type)
60:       unless @subscriptions.include?(agent)
61:         subscription = Util.make_subscriptions(agent, type, collective)
62:         Log.debug("Subscribing to #{type} target for agent #{agent}")
63: 
64:         Util.subscribe(subscription)
65:         @subscriptions[agent] = 1
66:       end
67:     end

[Source]

    # File lib/mcollective/client.rb, line 69
69:     def unsubscribe(agent, type)
70:       if @subscriptions.include?(agent)
71:         subscription = Util.make_subscriptions(agent, type, collective)
72:         Log.debug("Unsubscribing #{type} target for #{agent}")
73: 
74:         Util.unsubscribe(subscription)
75:         @subscriptions.delete(agent)
76:       end
77:     end

[Validate]