package org.thingsboard.rule.engine.node.external;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.shade.org.eclipse.util.URIUtil;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;

@RuleNode(type = ComponentType.EXTERNAL, name = "pulsar client producer", configClazz = TbPulsarClientProducerConfiguration.class, nodeDescription = "CRNEX01 Pulsar Client Producer - Publish message to pulsar topic via client.", nodeDetails = "The whole message body will be used as the content of pulsar payload. Message is sent with default <code>byte</code> schema.<br /><br />Support publishing to one or more brokers. Example:<br />\"serverURL\": \"192.168.3.1:6650,192.168.3.2:6650,192.168.3.3:6650\"<br /><br />If fail to publish, route out as <b>Failure</b> chain with the error message.<br /><br />Hint: for <code>topic</code>, use ${metadataKey} for value from metadata, $[messageKey] for value from message body.", uiResources = {}, configDirective = "")
/* loaded from: input_file:org/thingsboard/rule/engine/node/external/TbPulsarClientProducer.class */
public class TbPulsarClientProducer implements TbNode {
    private static final ObjectMapper mapper = new ObjectMapper();
    private TbPulsarClientProducerConfiguration config;
    private boolean enableTLS;
    private String serverURL;
    private String tenant;
    private String namespace;
    PulsarClient client;
    private ConcurrentMap<String, Producer<byte[]>> producerCache;

    public void init(TbContext tbContext, TbNodeConfiguration tbNodeConfiguration) throws TbNodeException {
        this.config = (TbPulsarClientProducerConfiguration) TbNodeUtils.convert(tbNodeConfiguration, TbPulsarClientProducerConfiguration.class);
        this.enableTLS = this.config.isEnableTLS();
        this.serverURL = this.config.getServerURL();
        this.tenant = this.config.getTenant();
        this.namespace = this.config.getNamespace();
        this.producerCache = new ConcurrentHashMap();
        try {
            this.client = PulsarClient.builder().serviceUrl("pulsar://" + this.serverURL).build();
        } catch (PulsarClientException e) {
            throw new TbNodeException(e);
        }
    }

    public void onMsg(TbContext tbContext, TbMsg tbMsg) throws ExecutionException, InterruptedException, TbNodeException {
        try {
            String processPattern = TbNodeUtils.processPattern(this.config.getTopic(), tbMsg);
            this.producerCache.computeIfAbsent(processPattern, str -> {
                try {
                    return this.client.newProducer().topic("persistent://" + this.tenant + URIUtil.SLASH + this.namespace + URIUtil.SLASH + processPattern).sendTimeout(10, TimeUnit.SECONDS).create();
                } catch (PulsarClientException e) {
                    throw new RuntimeException(e);
                }
            }).send(tbMsg.getData().getBytes());
            tbContext.tellSuccess(tbMsg);
        } catch (Exception e) {
            tbContext.tellFailure(tbMsg, e);
        }
    }

    public void destroy() {
        try {
            Iterator<Producer<byte[]>> it = this.producerCache.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.client.close();
        } catch (PulsarClientException e) {
        }
    }
}
