Skip to content

Commit

Permalink
private channel reconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouaini528 committed Apr 13, 2021
1 parent 309ae3c commit e1939a6
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 35 deletions.
8 changes: 5 additions & 3 deletions src/Api/WebSocket/SocketClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public function subscribe(array $sub=[]){
'connection'=>0,
]);
}

//print_r($this->resub($sub));
$this->save('add_sub',$this->resub($sub));
}

Expand Down Expand Up @@ -206,7 +206,9 @@ function test_reconnection(){
];
}

function test_reconnection2(){
$this->client->debug2=1;
function test_reconnection2($key){
$this->client->debug=[
'private'=>[$key=>'close'],
];
}
}
12 changes: 9 additions & 3 deletions src/Api/WebSocket/SocketFunction.php
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private function channelType($sub){
/**
* 重新订阅
*/
private function reconnection($global,$type='public'){
private function reconnection($global,$type='public',array $keysecret=[]){
$all_sub=$global->get('all_sub');
if(empty($all_sub)) return;

Expand All @@ -188,10 +188,16 @@ private function reconnection($global,$type='public'){
foreach ($all_sub as $v){
if(!is_array($v)) $temp[]=$v;
}

$global->save('add_sub',$this->resub($temp));
}else{
$this->keysecret=$keysecret;

$temp=[];
foreach ($all_sub[$keysecret['key']] as $v){
$t=explode(self::$USER_DELIMITER,$v);
$temp[]=$t[1];
}
}

$global->save('add_sub',$this->resub($temp));
}
}
69 changes: 40 additions & 29 deletions src/Api/WebSocket/SocketServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,12 @@ private function onMessage($global){
$global->saveQueue($table,$data);
}else{
$global->save($table,$data);
}

//最后数据更新时间
$con->tag_data_time=time();
//成功接收数据重连次数回归0
$con->tag_reconnection_num=0;
//最后数据更新时间
$con->tag_data_time=time();
//成功接收数据重连次数回归0
$con->tag_reconnection_num=0;
}
return;
}

Expand Down Expand Up @@ -230,12 +230,12 @@ private function onMessage($global){
$global->saveQueue($table,$data);
}else{
$global->save($table,$data);
}

//最后数据更新时间
$con->tag_data_time=time();
//成功接收数据重连次数回归0
$con->tag_reconnection_num=0;
//最后数据更新时间
$con->tag_data_time=time();
//成功接收数据重连次数回归0
$con->tag_reconnection_num=0;
}
return;
}

Expand All @@ -251,13 +251,21 @@ private function onClose($global){
$this->log($con->tag.' reconnection');

$this->reconnection($global,'public');

$con->reConnect(10);
}else{
$this->log('connection close '.$con->tag_keysecret['key']);
$this->log('private connection close '.$con->tag_keysecret['key']);

Timer::del($con->timer_other);
//更改为掉线状态
$this->keysecretInit($con->tag_keysecret,[
'connection'=>2,
'connection_close'=>0,
'auth'=>0,
]);

//重新订阅私有频道
$this->reconnection($global,'private',$con->tag_keysecret);
}

$con->reConnect(10);
};
}

Expand Down Expand Up @@ -285,23 +293,18 @@ private function other($con,$global){

$this->log('listen '.$con->tag);

/*if(isset($con->tag_data_time)){
echo time()-$con->tag_data_time;
echo PHP_EOL;
}*/

//公共数据如果60秒内无数据更新,则断开连接重新订阅,重试次数不超过10次
if(isset($con->tag_data_time) && time()-$con->tag_data_time>60*($con->tag_reconnection_num+1) && $con->tag_reconnection_num<=10){
if(in_array($con->tag,$this->public_url)) {
//public
if(in_array($con->tag,$this->public_url)) {
//public
if (isset($con->tag_data_time) && time() - $con->tag_data_time > 60 * ($con->tag_reconnection_num + 1) && $con->tag_reconnection_num <= 10) {
$con->close();
}else{
//private
}

$con->tag_reconnection_num++;
$con->tag_reconnection_num++;

$this->log('listen '.$con->tag.' reconnection_num:'.$con->tag_reconnection_num.' tag_data_time:'.$con->tag_data_time);
$this->log('listen ' . $con->tag . ' reconnection_num:' . $con->tag_reconnection_num . ' tag_data_time:' . $con->tag_data_time);
}
}else{
//private
}
});
}
Expand All @@ -312,10 +315,10 @@ private function other($con,$global){
* @param $global
*/
private function debug($con,$global){
$debug=$global->get('debug');

if(in_array($con->tag,$this->public_url)) {
//public
$debug=$global->get('debug');

if(isset($debug['public']) && $debug['public'][$con->tag]=='close'){
$this->log($con->tag.' debug '.json_encode($debug));

Expand All @@ -326,6 +329,14 @@ private function debug($con,$global){
}
}else{
//private
if(isset($debug['private'][$con->tag_keysecret['key']]) && $debug['private'][$con->tag_keysecret['key']]=='close'){
$this->log($con->tag_keysecret['key'].' debug '.json_encode($debug));

$debug['private'][$con->tag_keysecret['key']]='recon';
$global->save('debug',$debug);

$con->close();
}
}
}

Expand Down

0 comments on commit e1939a6

Please sign in to comment.