熟悉 swoole 和 socket 编程的大佬能进来帮忙看看这个问题么?

2019-01-29 13:54:10 +08:00
 echo404

1.背景

我司最近将消息中间件改为了 mqtt,因为 mqtt 的特性(同一个 client_id 不能进行多次连接),导致客户端与 mqtt 服务器之间必须使用一个长连接。一开始我是用队列 while 循环拉取来实现发消息这个操作的。后来闲的蛋疼加上想学点新的东西,就想用 swoole 写个 tcp 服务器,然后就入了坑了.....

2、问题

现在,我在每个 task 进程中都启动了一个 mqtt 连接,在压测时,这个 mqtt 连接经常会出现 errno=11 (资源不可用)的错误,我查了下相关资料,这个错误出现的原因是因为 mqtt 连接的 socket 写缓冲区满了,消息无法写进写缓冲区。关于这块我很疑惑,task 进程不应该是个单线程的进程么?这样 mqtt 连接一次只处理一个消息,那为什么还会出现写缓冲区满的错误呢? 另外一个问题就是在测试性能时,我用 swoole 写的这个服务器性能竟然和 while 循环拉取队列的性能差不多,差不多都是每秒发送 380 条消息左右。我都要疯了,究竟是我什么地方写的不对,我现在都怀疑人生了!!!!

3、代码

<?php
namespace swoole;

//如果以守护进程启动后,必须使用绝对地址
include_once __DIR__."/mqtt/MessageId.php";
include_once __DIR__."/mqtt/phpMQTT.php";

class TcpServer
{
    private $serv;
    private $mqtt_config;
    private $mqtt_connect;
    const PROCESS_NAME = 'swoole';

    public function __construct()
    {
        //创建 Server 对象,监听 127.0.0.1:9501 端口
        $this->serv = new \swoole_server('0.0.0.0', 9501);

        //设置属性
        $this->serv->set(array(
            //守护进程化。设置 daemonize => 1 时,程序将转入后台作为守护进程运行。长时间运行的服务器端程序必须启用此项。
            //启用守护进程后,CWD (当前目录)环境变量的值会发生变更,相对路径的文件读写会出错。PHP 程序中必须使用绝对路径
            'daemonize' => true,
            'worker_num' => 2, //异步非阻塞代码一般设为 CPU 的 1-4 倍。
            'max_request' => 10000, //一个 worker 进程在处理完超过此数值的任务后将自动退出,主要作用是解决 PHP 进程内存溢出问题
            'max_conn' => 1024, //进程最大连接数
            'task_worker_num' => 20, //Task 进程最大值不得超过 cpu_num*1000,该进程是同步阻塞的,里面不得调用异步 IO 函数
            'task_ipc_mode' => 3, //worker 进程与 task 进程之间的通信模式,3 为队列通信并且设置为了争抢模式,使用消息队列通信,如果 Task 进程处理能力低于投递速度,可能会引起 Worker 进程阻塞。
            'message_queue_key' => 0x72000100, //指定消息队列的 key
            'task_max_request' => 10000, //task 进程最大任务数
            'log_file' => '/tmp/swoole.log', //日志文件
            'log_level' => 4, //需要记录的错误级别
            'tcp_fastopen' => true, //开启 TCP 快连接
        ));

        //监听服务启动事件
        $this->serv->on('Start', array($this, 'onStart'));
        //监听管理进程启动事件
        $this->serv->on('ManagerStart', array($this, 'onManagerStart'));
        //监听工作进程启动事件
        $this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
        //监听工作进程异常退出事件
        $this->serv->on('WorkerError', array($this, 'onWorkerError'));
        //监听工作检测停止事件
        $this->serv->on('WorkerStop', array($this, 'onWorkerStop'));
        //监听连接进入事件
        $this->serv->on('Connect', array($this, 'onConnect'));
        //监听数据接受事件
        $this->serv->on('Receive', array($this, 'onReceive'));
        //监听连接关闭事件
        $this->serv->on('Close', array($this, 'onClose'));
        //监听 task 进程接收任务事件
        $this->serv->on('Task', array($this, 'onTask'));
        //监听 Task 进程完成任务事件
        $this->serv->on('Finish', array($this, 'onFinish'));

        //启动服务器
        $this->serv->start();
    }

    //onStart 回调中,仅允许 echo、打印 Log、修改进程名称。不得执行其他操作
    public function onStart($serv)
    {
        swoole_set_process_name(self::PROCESS_NAME.'_master');
    }

    //在这个回调函数中可以修改管理进程的名称
    public function onManagerStart($serv)
    {
        swoole_set_process_name(self::PROCESS_NAME.'_manager');
    }

    //此事件在 Worker 进程 /Task 进程启动时发生,这里创建的对象可以在进程生命周期内使用
    public function onWorkerStart($serv, $worker_id)
    {
        //引入常用函数文件,由于可能会发送更改,所以在 worker 进程开始时引用
        include_once __DIR__.'/mqtt/__cron.php';
        $jobType = $serv->taskworker ? 'Tasker' : 'Worker';
        swoole_set_process_name(self::PROCESS_NAME.'_'.$jobType.'_'.$worker_id);
        //在 task 进程中启动 mqtt 连接
        if ($serv->taskworker) {
            echo "Task 进程({$worker_id})启动\r\n";
            //获取配置
            $this->mqtt_config = get_mqtt_config($worker_id);
            //连接服务器(这里为了以后能加入多个 mqtt 实例,所以我们将连接放入一个数组中)
            foreach ($this->mqtt_config as $key=>$item) {
                $mqtt = get_mqtt($item, 'o2o_mqtt', []);
                //如果没有连接上 mqtt 服务器,关闭当前进程
                if (!$mqtt) {
                    $serv->stop($worker_id, true);
                }
                $mqtt_arr[$key] = $mqtt;
            }
            $this->mqtt_connect = $mqtt_arr;
            //30S 发送一次心跳包
            $serv->tick(30000, function () use ($serv, $worker_id) {
                //发送心跳包
                foreach ($this->mqtt_config as $key=>$item) {
                    if (!$this->mqtt_connect[$key]['obj']->ping()) {
                        //如果 ping 失败就重新连接
                        echo "{$item['addr']} ping 失败,退出当前 task 进程($worker_id)\r\n";
                        $mqtt = get_mqtt($item, 'o2o_mqtt', []);
                        if (!$mqtt) {
                            $serv->stop($worker_id, true);
                        }
                        $this->mqtt_connect[$key] = $mqtt;
                    }
                }
            });
        }

        //在 worker 进程判断文件是否更新
        if (!$serv->taskworker) {
            //清除文件状态缓存,这个是为了防止下面 filemtime 从缓存中读取
            clearstatcache();
            $filemtime = filemtime(__FILE__);
            //30S 检测一次文件更新
            $serv->tick(30000, function () use ($serv, $worker_id, $filemtime) {
                //检查文件更新
                clearstatcache();
                //如果文件变化,则重启所有的 work 进程
                if ($filemtime != filemtime(__FILE__)) {
                    echo "文件更新,重启所有 woker/task 进程\r\n";
                    $serv->reload();
                }
            });
        }
    }

    public function onWorkerError($serv, $worker_id, $worker_pid, $exit_code, $signal)
    {
        echo "{$worker_id} Error\r\n";
    }

    //此函数在 Worker 进程中执行
    public function onWorkerStop($serv,$worker_id)
    {
        //zend_opcache 的 opcache 清理函数,防止某些服务器开启了 opcache
        opcache_reset();
    }

    //此函数在 Worker 进程中执行
    public function onConnect($serv, $fd)
    {
        //echo "Client: connect.\n";
    }

    //此函数在 Worker 进程中执行
    public function onReceive($serv, $fd, $from_id, $data)
    {
        $param['data'] = json_decode($data,true);
        $param['fd'] = $fd;
        //向 task 进程投递任务
        $serv->task(json_encode($param));
    }

    //此函数在 Task 进程中执行
    public function onTask($serv, $task_id, $src_worker_id, $data)
    {
        $st = microtime(true);
        $param = json_decode($data, true);
        $data = $param['data'];
        $fd = $param['fd'];
        $return = ['code' => 2, 'msg' => 'mqtt 消息发送失败'];
        foreach ($this->mqtt_connect as $key=>$value) {
            if ($value['minDevNo'] < $data['device_id'] && $value['maxDevNo'] > $data['device_id']) {
                $res = send_message($value['obj'], $data['mqtt_topic'], $data['message']);
                if ($res) {
                    $return['code'] = 1;
                    $return['msg'] = 'mqtt 消息发送成功';
                    echo "接收到数据".$data['message'].', 发往'.$data['mqtt_topic']."成功".time()."\r\n";
                }else{
                    //断线重连
                    echo "接收到数据".$data['message'].', 发往'.$data['mqtt_topic']."失败,重新连接 mqtt\r\n";
                    foreach ($this->mqtt_config as $k=>$item) {
                        $mqtt = get_mqtt($item, 'o2o_mqtt', []);
                        if (!$mqtt) {
                            $serv->stop($serv->worker_id, true);
                        }
                        $mqtt_arr[$k] = $mqtt;
                    }
                    $this->mqtt_connect = $mqtt_arr;
                }
            }
        }
        $res = json_encode($return);
        $serv->send($fd, $res);
        $et = microtime(true);
        echo "任务{$src_worker_id}-{$serv->worker_id}-{$task_id}完成,花费时间".($et-$st)."S\r\n";
        return $res;
    }

    //此函数在 worker 进程中执行
    public function onFinish($serv, $task_id, $data)
    {
        //echo "{$task_id}回调完成\r\n";
    }

    public function onClose($server, $fd, $reactorId)
    {
        //echo "Client: close.\n";
    }
}

$serv = new TcpServer();
3982 次点击
所在节点    PHP
14 条回复
wo642436249
2019-01-29 16:05:09 +08:00
干嘛不用协程
echo404
2019-01-29 16:12:46 +08:00
@wo642436249 刚接触 swoole,异步非阻塞就已经写得很费力了,暂时没有能力去写个协程版的,而且问题应该也不是在这块吧
puritania
2019-01-29 16:21:20 +08:00
所以我选择 golang
AngryPanda
2019-01-29 16:22:01 +08:00
[task 进程不应该是个单线程的进程么?]

我的理解不是。比如你只开了 2 个 worker,难道只能有 2 个请求被并行处理?那并发数怎么可能上的去呢?
triptipstop
2019-01-29 16:23:25 +08:00
写不好 PHP 的才用 Go
AngryPanda
2019-01-29 16:27:11 +08:00
我最近也是第一次用 swoole 来写了一个应用,使用的协程接口。

性能上的确提升很大,然而写法让人很不习惯。这点比 golang 难用多了。
echo404
2019-01-29 16:34:16 +08:00
@AngryPanda 测试服务器是我自己的一个 1 核 2G 的小水管,所以按文档中所说,worker 进程是 CPU 数的 1-4 倍,我只开了 2 个进程,每个 worker 进程处理的最大连接数为 1024,2 个进程就同时接收 2048 个请求。超出这个数值之后,如果再有请求去连接这个 TCP 服务器应该会报错误,但是我这边的压测日志中,并没有记录到对应的错误,所以应该还没有到达最大并发量才对
ferock
2019-01-29 16:37:01 +08:00
@AngryPanda swoole 研发成本其实并不低,比较起来,还不如用其他语言带来的性能提升来的 “核算”
AngryPanda
2019-01-29 16:37:27 +08:00
@echo404 单线程怎么同时处理这些多连接的请求呢。
AngryPanda
2019-01-29 16:39:30 +08:00
@ferock 的确如此。swoole 的协程还需要配合很多协程客户端来用,这点限制非常大。且和原来的 php 写法差异比较大。
echo404
2019-01-29 16:49:15 +08:00
@AngryPanda 就我的理解:worker 进程是多线程的,task 进程是单线程的,worker 进程接收到 reactor 进程传递过来的请求之后,将请求投递到 linux 系统自带的队列中去(这个过程是异步的),task 进程就一直读取这个队列中的消息进行处理
liuxu
2019-01-29 17:24:27 +08:00
1 核 2G,不会 1M 带宽吧,压测带宽跑满了吧
echo404
2019-01-30 10:34:25 +08:00
@liuxu 还真是,把网络这块给忘了
chdahuzi
2019-04-03 21:58:00 +08:00
@AngryPanda swoole4 内置了协程,即便不显示得用协程,每个请求都用到了协程

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://tanronggui.xyz/t/531560

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX