  int status = 0;
  NSApi.JMSKey key = null;
  String msgToBeProduce = "message";
  
  String msgKey = "message key";
  
  // Initializes JMS Pool with given pool size and arguments
  // It returns a Pool ID which need to be passed in subsequent api for getting connection from the pool
  // In case of error, it returns error code with error_msg (see help for details)
  // JMSKey ns_kafka_init_producer(kafka_hostname, kafka_port, kafka_topic, max_pool_size,Kafka_schema_registry_url)
  if((key = nsApi.ns_kafka_init_producer("127.0.0.1",9092,"topic",1,null)) == null)
  {
  System.out.println("Error in initializing kafka producer. Issue while initialising Look Up Table or with key insertion in it.");
  return 0;
  }
  
  //Sets kafka security protocol
  //ns_kafka_set_security_protocol(key, security_protocol)
  if(!nsApi.ns_kafka_set_security_protocol(key, "ssl") && key.getErrCode() != 0)
  {
  System.out.println("Error in setting kafka security protocol.  Error code = " + key.getErrCode() + ", Error Msg = " + key.getErrMsg() + "\n");
  return 0;
  }
  
  //Sets ssl ciphers in case of SSL protocol
  //ns_kafka_set_ssl_ciphers(key, ciphers)
  if(!nsApi.ns_kafka_set_ssl_ciphers(key, "ciphers") && key.getErrCode() != 0)
  {
  System.out.println("Error in setting Kafka ssl ciphers. Error code = " + key.getErrCode() + ", Error Msg = " + key.getErrMsg() + "\n");
  return 0;
  }
  
  //Sets ssl key from the given file path . Here KeyPassword is mandatory
  //ns_kafka_set_ssl_keyFile(key, keyFilePath, keyPassword)
  if(!nsApi.ns_kafka_set_ssl_key_file(key, "/home/keyfile", "mWWDbw3Cjb0lke7UZuaS0A==") && key.getErrCode() != 0)
  {
  System.out.println("Error in setting Kafka ssl key. Error code = " + key.getErrCode() + ", Error Msg = " + key.getErrMsg() + "\n");
  return 0;
  }
  
  //Sets ssl certificate from the given file path
  //ns_kafka_set_ssl_certFile(key, certificateFilePath)
  if(!nsApi.ns_kafka_set_ssl_cert_file(key, "/home/certificatefile") && key.getErrCode() != 0)
  {
  System.out.println("Error in setting Kafka ssl certificate. Error code = " + key.getErrCode() + ", Error Msg = " + key.getErrMsg() + "\n");
  return 0;
  }
  
  //Sets ssl ca certificate from the given file path
  //ns_kafka_set_ssl_caFile(key, caCertifcateFilePath)
  if(!nsApi.ns_kafka_set_ssl_ca_file(key, "/home/trustedca", "mWWDbw3Cjb0lke7UZuaS0A==") && key.getErrCode() != 0)
  {
  System.out.println("Error in setting Kafka ssl ca certificate. Error code = " + key.getErrCode() + ", Error Msg = " + key.getErrMsg() + "\n");
  return 0;
  }
  
  // Sets ssl server certificate from the given file path
  // This is needed if verifiy broker is enabled
  //ns_kafka_set_ssl_crlFile(key, crlFilePath)
  if(!nsApi.ns_kafka_set_ssl_crl_file(key, "/home/crlfile") && key.getErrCode() != 0)
  {
  System.out.println("Error in setting Kafka ssl certificate. Error code = " + key.getErrCode() + ", Error Msg = " + key.getErrMsg() + "\n");
  return 0;
  }
  
  // Initalizes Producer connection by getting connection from the given pool ID.
  // Returns the connection id  . will reuse connection if free or make new connection  ???
  // In case of error, it returns error code with error_msg (see help for details)
  // If connection is made, then transaction is set with name passed in second argument. for example here it sets KAFKAProducerConnect
  // If connection fails, then transaction is set with <TxName><ErrorMsg>. For example, KAFKAProducerConnectTimeout
  if(!nsApi.ns_kafka_get_connection(key, "KAFKAProducerSSLConnect"))
  {
  System.out.println("Error in getting Kafka connection. Error code = " + key.getErrCode() + ", Error Msg = " + key.getErrMsg() + "\n");
  return 0;
  }
  
  // Puts message in the JMS queue/topic using connection ID returned from get connection api
  // In case of error, it returns error code with error_msg (see help for details)
  // Here Transaction is set with name passed in sixth argument . for example here it sets KAFKAPutMsg
  // If api fails, then transaction is set with <TxName><ErrorMsg>. For example, KAFKAPutMsgConnectTimeout
  // ns_kafka_put_msg(jcid, msg, msg_len, "KAFKAPutMsg", error_msg)
  // new version of api nsApi.ns_kafka_put_msg 
  if(!nsApi.ns_kafka_put_msg_v2(key,  msgToBeProduce,  msgKey , "KAFKAPutMsg"))
  {
  System.out.println("Error in putting using kafka connection. Error code = " + key.getErrCode() + ", Error Msg = " + key.getErrMsg() + "\n");
  }
  
  // Releases the connection from connection pool 
  // It should be called every time so that another user can reuse it.
  if(!nsApi.ns_kafka_release_connection(key))
  {
  System.out.println("Error in Releasing kafka connection. Error code = " + key.getErrCode() + ", Error Msg = " + key.getErrMsg() + "\n");
  return 0;
  }
  
  /*
  // Closes connection with kafka server 
  // It should be called at the end of test and not every time as it may have performance overhead.
  if(!nsApi.ns_kafka_close_connection(key, "KAFKAProducerSSLClose"))        {
  System.out.println("Error in closing kafka connection. Key = " + key.getKeyMain() + " - " + key.getIdPool() + "\n");
  return 0;
  }
  
  */
  
  // Page think time in case of adding delay for next session
  nsApi.ns_page_think_time(0.0);