caml-list - the Caml user's mailing list
 help / color / mirror / Atom feed
* netplex multi-thread asynchronous processor for passive clients
@ 2009-04-03 11:19 Serge Sivkov
  2009-04-03 15:51 ` [Caml-list] " Gerd Stolpmann
  0 siblings, 1 reply; 6+ messages in thread
From: Serge Sivkov @ 2009-04-03 11:19 UTC (permalink / raw)
  To: caml-list

Hello,
I want to convert my synchronious multi process netplex service to
asynchronous multi thread one.
I don't understood how can i send data from #receive_message
to #process for asynchronous worker (ideally more than one per thread).

Here is my current code :
class my_hooks =
  ...
  method receive_admin_message container name args =
    let s = "admin message to contaner: " ^ name in
    container#log `Info s;
    let aux (proto,fds) =
      Array.iter
        (fun fd ->
        container#socket_service#processor#process (fun () -> ())
container fd proto)
        fds in
    List.iter aux container#socket_service#sockets

end

class ts_alfa_processor hooks : Netplex_types.processor =
  ...
  method process ~when_done container fd proto_name =
    let s =
      sprintf "process called with %d and %s" (int_of_file_descr fd)
        proto_name in
    container#log `Info s;
    let ch = Unix.out_channel_of_descr fd in
    let rec aux () =
      let s = (Sexp.to_string (sexp_of_msg dfl_msg)) ^ "\n" in
      output_string ch s;
      flush ch in (*+aux() for synchronious version *)
    aux ()

method supported_ptypes = [ `Multi_processing; `Multi_threading ]
end

That code doesn't work because method process called from
receive_admin_message got wrong fd as argument (i assume, that's
parent fd used to accept right one).

What is the right way to write porcessor which must:
 - work asyncroniously with multithread workload manager
   (ideally, many jobs per thread)
 - work with clients whom don't send any packets
- get data to send from messages sent by other service
?


^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [Caml-list] netplex multi-thread asynchronous processor for passive clients
  2009-04-03 11:19 netplex multi-thread asynchronous processor for passive clients Serge Sivkov
@ 2009-04-03 15:51 ` Gerd Stolpmann
  2009-04-03 17:55   ` Serge Sivkov
  0 siblings, 1 reply; 6+ messages in thread
From: Gerd Stolpmann @ 2009-04-03 15:51 UTC (permalink / raw)
  To: Serge Sivkov; +Cc: caml-list


Am Freitag, den 03.04.2009, 17:19 +0600 schrieb Serge Sivkov:
> Hello,
> I want to convert my synchronious multi process netplex service to
> asynchronous multi thread one.
> I don't understood how can i send data from #receive_message
> to #process for asynchronous worker (ideally more than one per thread).
> 
> Here is my current code :
> class my_hooks =
>   ...
>   method receive_admin_message container name args =
>     let s = "admin message to contaner: " ^ name in
>     container#log `Info s;
>     let aux (proto,fds) =
>       Array.iter
>         (fun fd ->
>         container#socket_service#processor#process (fun () -> ())
> container fd proto)
>         fds in
>     List.iter aux container#socket_service#sockets
> 
> end
> 
> class ts_alfa_processor hooks : Netplex_types.processor =
>   ...
>   method process ~when_done container fd proto_name =
>     let s =
>       sprintf "process called with %d and %s" (int_of_file_descr fd)
>         proto_name in
>     container#log `Info s;
>     let ch = Unix.out_channel_of_descr fd in
>     let rec aux () =
>       let s = (Sexp.to_string (sexp_of_msg dfl_msg)) ^ "\n" in
>       output_string ch s;
>       flush ch in (*+aux() for synchronious version *)
>     aux ()
> 
> method supported_ptypes = [ `Multi_processing; `Multi_threading ]
> end
> 
> That code doesn't work because method process called from
> receive_admin_message got wrong fd as argument (i assume, that's
> parent fd used to accept right one).

No, that does not work. process must be called from the right
environment.

> What is the right way to write porcessor which must:
>  - work asyncroniously with multithread workload manager
>    (ideally, many jobs per thread)
>  - work with clients whom don't send any packets
> - get data to send from messages sent by other service

e.g. open a Unix domain socket, and send data over that socket. Right
now, there is no faster way that is officially supported. For
convenience, you can define RPC procedures that do all the serialization
details.

What could work if sender and receiver are on the same event system: The
sender puts a special event with the message onto the event queue, and
the receiver installs an event handler listening for such events. I
don't have demo code for something like this, however.

Gerd
-- 
------------------------------------------------------------
Gerd Stolpmann * Viktoriastr. 45 * 64293 Darmstadt * Germany 
gerd@gerd-stolpmann.de          http://www.gerd-stolpmann.de
Phone: +49-6151-153855                  Fax: +49-6151-997714
------------------------------------------------------------



^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [Caml-list] netplex multi-thread asynchronous processor for  passive clients
  2009-04-03 15:51 ` [Caml-list] " Gerd Stolpmann
@ 2009-04-03 17:55   ` Serge Sivkov
  2009-04-03 18:13     ` Jake Donham
  2009-04-04 14:24     ` Serge Sivkov
  0 siblings, 2 replies; 6+ messages in thread
From: Serge Sivkov @ 2009-04-03 17:55 UTC (permalink / raw)
  To: Gerd Stolpmann; +Cc: caml-list

Hello,

I 've look through sources of the netplex. For now i clearly see how
synchronious service works,
but i don't understand workflow of anyncronious version of netplex
services (i'm using ocamlnet-2.2.9). As says manual, to implement
anync version of the processor i must to return control to container
right away as first message is sent and don't call when_done.
At that moment, container calls #setup_polling, but i don't understand
how this code may jumps back to #process?

Does services started from one Netplex_main.startup are share same
event system for case
where their workload_manager sections in configuration file are identical?

I've written some code, but it doesn't works :(

exception DataArrived

(* from #process of the receiver service i call *)
container#event_system#add_event (Extra DataArrived)

(* and in #process of the sender i doing something like *)
let g = container#event_system#new_group in
container#event_system#add_handler g (fun _ _ evt -> output_string ch
"...\n"; flush ch)

but no one event has been detected in sender service...
Where i'm wrong?

>> That code doesn't work because method process called from
>> receive_admin_message got wrong fd as argument (i assume, that's
>> parent fd used to accept right one).
>
> No, that does not work. process must be called from the right
> environment.
>
>> What is the right way to write porcessor which must:
>>  - work asyncroniously with multithread workload manager
>>    (ideally, many jobs per thread)
>>  - work with clients whom don't send any packets
>> - get data to send from messages sent by other service
>
> e.g. open a Unix domain socket, and send data over that socket. Right
> now, there is no faster way that is officially supported. For
> convenience, you can define RPC procedures that do all the serialization
> details.
>
> What could work if sender and receiver are on the same event system: The
> sender puts a special event with the message onto the event queue, and
> the receiver installs an event handler listening for such events. I
> don't have demo code for something like this, however.


^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [Caml-list] netplex multi-thread asynchronous processor for  passive clients
  2009-04-03 17:55   ` Serge Sivkov
@ 2009-04-03 18:13     ` Jake Donham
  2009-04-04 14:24     ` Serge Sivkov
  1 sibling, 0 replies; 6+ messages in thread
From: Jake Donham @ 2009-04-03 18:13 UTC (permalink / raw)
  To: Serge Sivkov; +Cc: Gerd Stolpmann, caml-list

On Fri, Apr 3, 2009 at 10:55 AM, Serge Sivkov <ssp.mryau@gmail.com> wrote:
> I 've look through sources of the netplex. For now i clearly see how
> synchronious service works,
> but i don't understand workflow of anyncronious version of netplex
> services (i'm using ocamlnet-2.2.9).

Can you explain what you're trying to achieve? I don't understand why
you want to connect #receive_message to #process. You might find it
easier to use Ocamlnet's RPC implementation to build an asynchronous
service. At least, looking at rpc_server.ml as an example of an
asynchronous service will be useful.

You may also be interested in orpc, which gives you a way to generate
RPC services from an OCaml signature:

  http://code.google.com/p/orpc2/

(new release coming soon.)

Jake


^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [Caml-list] netplex multi-thread asynchronous processor for  passive clients
  2009-04-03 17:55   ` Serge Sivkov
  2009-04-03 18:13     ` Jake Donham
@ 2009-04-04 14:24     ` Serge Sivkov
  2009-04-05 16:27       ` Serge Sivkov
  1 sibling, 1 reply; 6+ messages in thread
From: Serge Sivkov @ 2009-04-04 14:24 UTC (permalink / raw)
  To: Gerd Stolpmann; +Cc: caml-list

Hello,
why following code doesn't react to messages sent to container (and to
events sent from other service)?

exception BeLive
exception DataArrived
...
  method receive_admin_message container name args =
    container#event_system#add_event (Extra DataArrived)

...
  method process ~when_done container fd proto_name =
    let ch = Unix.out_channel_of_descr fd in
    let rec aux () =
      let s = (Sexp.to_string (sexp_of_msg dfl_msg)) ^ "\n" in
      output_string ch s;
      flush ch;
      Unix.sleep 1;
      container#event_system#add_event (Extra BeLive)
    in
    let cb _ _ = function
      | Extra DataArrived ->
        aux()
      | Extra BeLive ->
        Unix.sleep 1;
        container#event_system#add_event (Extra BeLive)
      | _ ->
        raise Equeue.Reject
    in
    let g = container#event_system#new_group() in
    container#event_system#add_handler g cb;
    aux ()

also, i'm interesting to answers to my previous qustion :) :
Does services started from one Netplex_main.startup are share same
event system for case
where their workload_manager sections in configuration file are identical?

How can i to know does some objects to share same event queue or doesn't?


^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [Caml-list] netplex multi-thread asynchronous processor for  passive clients
  2009-04-04 14:24     ` Serge Sivkov
@ 2009-04-05 16:27       ` Serge Sivkov
  0 siblings, 0 replies; 6+ messages in thread
From: Serge Sivkov @ 2009-04-05 16:27 UTC (permalink / raw)
  To: Gerd Stolpmann; +Cc: caml-list

So, i got my service runing, partly. Currently i'm using ORPC and
synchronius calls to my RPC server. I simply can't start async RPC
client from processor's #process:
just after:
    let clnt = Protocol_clnt.create_client
      ~esys: container#event_system
      (Rpc_client.Inet ("localhost", 9007)) Rpc.Tcp in
    Protocol_clnt.new_signal'async clnt () cb;
i see in log:
Exception File "unixqueue.ml", line 954, characters 6-12: Assertion failed
But synchronious client works.

So here is my list of questions:
- why i may not to use nice netflex message bus from processor?
- how can i to know does some objects to share same event queue or doesn't?
- why i can't start async rpc client from processor by using
container#even_queue?
  (that queue is usable from processor: i can send and receive events
by using it)
- how can i use container#send_message from processor?
  (simple call to it silently hungs container's rpc)


^ permalink raw reply	[flat|nested] 6+ messages in thread

end of thread, other threads:[~2009-04-05 16:27 UTC | newest]

Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2009-04-03 11:19 netplex multi-thread asynchronous processor for passive clients Serge Sivkov
2009-04-03 15:51 ` [Caml-list] " Gerd Stolpmann
2009-04-03 17:55   ` Serge Sivkov
2009-04-03 18:13     ` Jake Donham
2009-04-04 14:24     ` Serge Sivkov
2009-04-05 16:27       ` Serge Sivkov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).