• 125417

    文章

  • 803

    评论

  • 12

    友链

  • 最近新加了换肤功能,大家多来逛逛吧~~~~
  • 喜欢这个网站的朋友可以加一下QQ群,我们一起交流技术。

Prometheus Python Client 多进程问题的解释与解决

服了这份高薪指南,涨多少你说了算>>

本文描述基于 prometheus-client (0.8.0) 版本。

Client 存储数据的方法与问题

官方 client 用于存储数据(不管是什么 Metric 类型)使用的是一个 ValueClass 对象,默认情况下定义是:

class MutexValue(object):
    """A float protected by a mutex."""

    _multiprocess = False

    def __init__(self, typ, metric_name, name, labelnames, labelvalues, **kwargs):
        self._value = 0.0
        self._lock = Lock()

    def inc(self, amount):
        with self._lock:
            self._value += amount

    def set(self, value):
        with self._lock:
            self._value = value

    def get(self):
        with self._lock:
            return self._value

使用这个对象而不是 float 的目的主要应该是为了在多线程情况下加锁。

显然这个值是不能在多进程场景下共享的,而多进程模式对于 Python 来说又是一种非常常用的模式,因此出现了一个

如何在多进程模式下共享采集的数据?

的问题。每个进程分别采集是解决问题的方案之一,但并不总是好使。

一种情况是如果进程数非常多,重复采集会导致存储在 Prometheus 服务端的数据量非常大,这些数据的统计维度只有 PID 的区别,这种区别往往没有实际用处,却耗费了很多的服务端性能。

另一种情况是在使用类似 Gunicorn 这种 Server 的时候,多子进程对外只暴露一个端口,HTTP Scrap 的采集方式无法生效,因为你一次只能随机访问到其中一个子进程的数据。

官方解决方案

官方给出的解决方案描述在 github.com/prometheus/client_python 可以看到。

具体地说就是为上面的 MutexValue 提供了一个替代品 MultiProcessValue

def MultiProcessValue(process_identifier=os.getpid):
    """Returns a MmapedValue class based on a process_identifier function.

    The 'process_identifier' function MUST comply with this simple rule:
    when called in simultaneously running processes it MUST return distinct values.

    Using a different function than the default 'os.getpid' is at your own risk.
    """
    files = {}
    values = []
    pid = {'value': process_identifier()}
    # Use a single global lock when in multi-processing mode
    # as we presume this means there is no threading going on.
    # This avoids the need to also have mutexes in __MmapDict.
    lock = Lock()

    class MmapedValue(object):
        """A float protected by a mutex backed by a per-process mmaped file."""

        _multiprocess = True

        def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess_mode='', **kwargs):
            self._params = typ, metric_name, name, labelnames, labelvalues, multiprocess_mode
            with lock:
                self.__check_for_pid_change()
                self.__reset()
                values.append(self)

        def __reset(self):
            typ, metric_name, name, labelnames, labelvalues, multiprocess_mode = self._params
            if typ == 'gauge':
                file_prefix = typ + '_' + multiprocess_mode
            else:
                file_prefix = typ
            if file_prefix not in files:
                filename = os.path.join(
                    os.environ['prometheus_multiproc_dir'],
                    '{0}_{1}.db'.format(file_prefix, pid['value']))

                files[file_prefix] = MmapedDict(filename)
            self._file = files[file_prefix]
            self._key = mmap_key(metric_name, name, labelnames, labelvalues)
            self._value = self._file.read_value(self._key)

        def __check_for_pid_change(self):
            actual_pid = process_identifier()
            if pid['value'] != actual_pid:
                pid['value'] = actual_pid
                # There has been a fork(), reset all the values.
                for f in files.values():
                    f.close()
                files.clear()
                for value in values:
                    value.__reset()

        def inc(self, amount):
            with lock:
                self.__check_for_pid_change()
                self._value += amount
                self._file.write_value(self._key, self._value)

        def set(self, value):
            with lock:
                self.__check_for_pid_change()
                self._value = value
                self._file.write_value(self._key, self._value)

        def get(self):
            with lock:
                self.__check_for_pid_change()
                return self._value

    return MmapedValue


def get_value_class():
    # Should we enable multi-process mode?
    # This needs to be chosen before the first metric is constructed,
    # and as that may be in some arbitrary library the user/admin has
    # no control over we use an environment variable.
    if 'prometheus_multiproc_dir' in os.environ:
        return MultiProcessValue()
    else:
        return MutexValue


ValueClass = get_value_class()

简单解释一下就是:锁的部分没有变,值的存储从 float 的基础上又增加了 MmapedDict 的一个 value:

class MmapedDict(object):
    """A dict of doubles, backed by an mmapped file.

    The file starts with a 4 byte int, indicating how much of it is used.
    Then 4 bytes of padding.
    There's then a number of entries, consisting of a 4 byte int which is the
    size of the next field, a utf-8 encoded string key, padding to a 8 byte
    alignment, and then a 8 byte float which is the value.

    Not thread safe.
    """

Mmap 并不关键,理解成一种内存值序列化到文件系统的方法即可。更大的一个改变是实例化 ValueClass 时得到的对象。原来得到的每个对象之间都是独立的,互相不感知。多进程模式下得到的对象都存储在同一个闭包里,这些对象还共享一个 MmapedDict 的存储空间,通过 mmap_key 来区分彼此。

如果去看一下存储 MMapDict 的目录,会发现一些这样的文件:

-rw-r--r--   1 foo  staff  1048576  8  4 17:17 counter_86997.db
-rw-r--r--   1 foo  staff  1048576  8  4 17:23 gauge_all_87328.db
-rw-r--r--   1 foo  staff  1048576  8  4 17:17 histogram_87029.db

这里每个文件对应一个 MmapDict 对象,对象的每一对 k, v 则对应一个 MmapedValue 实例。

这种操作就像在 redis 里创建一个 Hash 对象,key 是 histogram_{pid},里面的键值对是:

django_http_requests_latency_seconds{le="1.0",method="GET",view="xxx.views.Metrics"} = 12.0
...

可以看到,在 observe 的时候,每个进程仍然是各采各的。MultiProcessValue 的功能仅仅是把数据序列化到了一个文件里。多进程数据的真实合并操作发生在 collect 的时候:

class MultiProcessCollector(object):
    def collect(self):
        files = glob.glob(os.path.join(self._path, '*.db'))
        return self.merge(files, accumulate=True)

collect 方法扫描了文件目录,并把所有数据合并起来。

官方方案存在的问题

1. 存储文件性能差

大概是为了绕过多进程写文件锁的问题,官方方案选择让每个进程写一组专属的文件,并将聚合后置。但是相同数量的数据,在 tag 分布均匀的情况下,存储 N 个文件的数据量是单个文件的 N 倍,这会带来磁盘 IO 和 CPU 性能的额外开销,而且增幅相当可观。这个问题 Issue 里也有人提,官方也反馈写到一个文件里是个值得一试的主意,但还没有提上日程。

2. 文件清理不可靠

当进程推出时,文件应该被删除,否则会造成脏数据和历史文件的无限堆积。对于文件的清理官方提供了针对 Gunicorn 的配置函数:child_exit。但这个需要清理的场景其实广泛存在:Celery 需要,Command 需要,自己起的 Shell 也需要。如果官方方案覆盖不完整,是很难期望用户自己能清理干净的。

一些替代方案的想法

1. DB

DB 方案的一种实现,Redis 方案的实现细节其实上面已经解释过了。DB 天生适合解决多进程写数据的问题,可以维护单独一份数据,甚至还可以把主机维度的 tag 消除掉(如果不需要的话),这可以把存储性能提到最高,且不需要耗费 CPU。

缺点的话就是增加了对 DB,以至网络的依赖,降低了系统的 SLA。是否值得取决于具体系统。

Redis 和 SQL 的选择区别不是很大,关键点都是 observe/collect 时候的循环语句问题。尽可能把多个 Value 的操作语句进行合并以提高网络 IO 效率。比如 observe 一个 histogram 的小值,可能导致多个 Value 对象发生变化。collect 的时候也需要扫描全部数据。

我曾经基于 0.4 版本的 Client 实现过一个 RedisValue(吐槽一下早期版本的代码质量有点偏低)。Redis 的好处是他的 observe 操作几乎总是耗费常数时间。但 collect 的时间复杂度是 O(N)。这在数据量变大以后(几千行)就慢的难以忍受了。后来我通过减少 Bucket 数量和使用 hmget 方法把复杂度降到了 O(LogN),接口响应时间也稳定在了一个可接受的水平,且增长缓慢。如果继续优化的话,也许还可以利用 Pipeline 把耗时再降低一级。

值得思考的是,Prometheus 改变传统的监控模式,把实时数据的存储 client 化,其实一大优势就是消除了监控数据集中丢失的风险。但本方案又引入了一个中心化存储组件,等于把风险又带了回来... 因此虽然正在用着,我对这个方案还是持怀疑态度。如果改成与服务结伴部署的分布式数据库,似乎可以在取得 DB 优势的情况下避免中心化风险,但维护成本会增高一些,尤其是在容器化场景下。这里像 SQLite 这样的无服务进程可能是最折衷的选择,但我对 SQLite 不了解,又担心它的性能或稳定性不好...

未完待续...


695856371Web网页设计师②群 | 喜欢本站的朋友可以收藏本站,或者加入我们大家一起来交流技术!

0条评论

Loading...


自定义皮肤 主体内容背景
打开支付宝扫码付款购买视频教程
遇到问题联系客服QQ:419400980
注册梁钟霖个人博客