PHP Classes

File: WebSocket.class.php

Recommend this page to a friend!
  Classes of Nathan Bruer   Web Socket Service   WebSocket.class.php   Download  
File: WebSocket.class.php
Role: Class source
Content type: text/plain
Description: Service class designed to be the master process, which forks the new processes and handles the distribution of cross data communication between the child processes.
Class: Web Socket Service
Handle Web socket accesses using child processes
Author: By
Last change: PHP 5.4 changed pass-by-reference to fully depreciate passing by reference to functions on function call.
Date: 11 years ago
Size: 6,486 bytes
 

Contents

Class file image Download
<?php
class Console{
    function
log($d){
       
$pid = getmypid();
       
$color = "\033[1;".(30 + ($pid & 7)).";".(40 + (($pid >> 3) & 7)).((($pid >> 6) & 1)?';4':'')."m";
       
printf("[%s]{$color}%-5s\033[0m: %s\n", date('m/d/Y h:i:s'), $pid, $d);
    }
}
class
WebSocket{
   
// How long in seconds to cleanup connections and garbage collect.
   
const CLEANUP_TIMER = 10;
   
    const
CHILD_PROCESS_RESPONSE_CLOSE = 'c';
    const
CHILD_PROCESS_RESPONSE_RELAY_TO = 'i';
    public
$link;
    public
$lastCleanup = 0;
    public
$children = array();

    function
__construct($address,$port){
        global
$child;
       
Console::log("Creating WebSocket");
        (
$this->link = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) || die("socket_create() failed");
       
Console::log("Configuring WebSocket");
       
socket_set_option($this->link, SOL_SOCKET, SO_REUSEADDR, 1) || die("socket_option() failed");
       
Console::log("Binding WebSocket");
       
socket_bind($this->link, $address, $port) || die("socket_bind() failed");
       
Console::log("Listening on WebSocket");
       
socket_listen($this->link) || die("socket_listen() failed");
       
Console::log("Setting WebSocket to non-blocking");

       
Console::log("Server Started");
       
Console::log("Listening on: $address port $port");
       
Console::log("Master socket: $this->link");

       
$lastCleanup = time();
        while(
true){
            while((
$child_id = pcntl_wait($status, WNOHANG)) > 0){ // Removes zombie children if they exist
               
Console::log("Removed zombie: $child_id");
                if(isset(
$this->children[$child_id])){
                   
socket_close($this->children[$child_id]);
                    unset(
$this->children[$child_id]);
                }
            }
           
           
$connections = array_merge(array('master' => $this->link), $this->children);
           
socket_select($connections, $write = null, $except = null, static::CLEANUP_TIMER);
            foreach(
$connections as $connection){
                if(
$connection === $this->link){
                   
// Is a connection request from the web browser
                   
$client = @socket_accept($this->link);
                    if(!
$client){
                       
Console::log("Web Client connection attempted but failed");
                        continue;
                    }else{
                       
socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $ary);
                       
$pid = pcntl_fork();
                        if(
$pid == -1){
                           
Console::log("Could not fork process");
                        }elseif(
$pid){
                           
// Parent
                           
socket_close($ary[0]);
                           
$this->children[$pid] = $ary[1];
                           
$child = false;
                        }else{
                           
// Child
                           
socket_close($ary[1]);
                            global
$parent;
                           
$parent = $ary[0];
                           
$child = $client;
                            return;
                        }
                        unset(
$ary);
                    }
                }else{
                   
// Child communicating with parent
                   
Console::log("Getting data from child");
                    if(
$l = socket_recv($connection, $len_data, 3, MSG_WAITALL)){
                       
$len = (ord($len_data{0}) << 16) | (ord($len_data{1}) << 8) | (ord($len_data{2}));
                       
Console::log("Got data of length: $l from child");
                        if(!
$len)
                            continue;
                        elseif(
socket_recv($connection, $data, $len, MSG_WAITALL) == $len){
                           
Console::log("Got data from child!");
                            switch(
$data{0}){
                                case
self::CHILD_PROCESS_RESPONSE_CLOSE:
                                   
Console::log("WebSocket: Received notice from child about death");
                                   
$this->closeChild(array_search($connection, $this->children, true));
                                    break;
                                case
self::CHILD_PROCESS_RESPONSE_RELAY_TO:
                                    if(!
$this->fork())
                                        break;
                                   
$child = null;
                                   
$pids = substr($data, 1, $len = strpos($data, ':'));
                                   
$pids = explode(',', $pids);
                                    if(!
$pids){
                                       
Console::log("No Process IDS passed with packet, forgetting packet!");
                                        break;
                                    }
                                    foreach(
$pids as &$pid)
                                       
$pid = (int) $pid;
                                    unset(
$pid);
                                   
$data = substr($data, $len+1);
                                   
$len = strlen($data);
                                    if(
$len > 0xFFFFFF){
                                       
Console::log("Could not send packet too large!");
                                        exit;
                                    }
                                    for(
$i=0;$i<3;$i++)
                                       
$data = chr(($len >> ($i * 8)) & 0xFF).$data;
                                   
$len = strlen($data);
                                    foreach(
$pids as $pid)
                                        if(isset(
$this->children[$pid])){
                                           
$i=0;
                                            do{
                                                if((
$d = @socket_send($this->children[$pid], $da = substr($data, $i), strlen($da), 0)) === false){
                                                   
Console::log("WebSocket: Failed to send data to: {$this->children[$pid]}");
                                                   
$this->closeChild($pid);
                                                    break;
                                                }
                                                if(!
$d)
                                                   
usleep(25);
                                               
$i += $d;
                                            }while(
$len > $i);
                                        }
                                    unset(
$pid, $len, $data, $i, $pids, $d, $len_data, $l, $da);
                                    global
$forked;
                                   
$forked = true;
                                   
$child = true;
                                    exit;
                                default:
                                    if(!
$this->fork())
                                        break;
                                   
$child = null;
                                   
Console::log("Relaying to all children");
                                   
$data = $len_data . $data;
                                   
$len = strlen($data);
                                    foreach(
$this->children as $k => $c){
                                       
$i=0;
                                        do{
                                            if((
$d = @socket_send($c, $da = substr($data, $i), strlen($da), 0)) === false){
                                               
Console::log("WebSocket: Failed to send data to: $c");
                                               
$this->closeChild($k);
                                                break;
                                            }
                                            if(!
$d)
                                               
usleep(25);
                                           
$i += $d;
                                        }while(
$len > $i);
                                    }
                                    unset(
$k, $c, $d, $i, $len, $len_data, $data, $l, $da);
                                    global
$forked, $child;
                                   
$forked = true;
                                   
$child = true;
                                    exit;
                            }
                        }else{
                           
Console::log("WebSocket: Not enough data received from child process");
                           
$this->closeChild(array_search($connection, $this->children, true));
                        }
                    }else
                       
$this->closeChild(array_search($connection, $this->children, true));
                }
            }
           
$data = '';
        }
    }
    public function
fork(){
       
$id = pcntl_fork();
        if(
$id == -1){
            return
false;
        }elseif(
$id)
           
// parent
           
return false;
        else
           
// child
           
set_time_limit(30);
            return
true;
    }
    public function
closeChild($process_id){
        if(!
$process_id)
            return;
       
$connection = $this->children[$process_id];
        @
socket_send($connection, "\x00\x00\x01c", 4, 0);
        @
socket_close($connection);
        unset(
$this->children[$process_id]);
    }
    public function
__destruct(){
        global
$child;
        if(
$child)
            return;
        foreach(
$this->children as $pid => $child)
           
$this->closeChild($pid);
        @
socket_close($this->link);
    }
}