HiveUDF函数实现模糊匹配

11 次阅读 预计阅读时间: 10 分钟


n

应用背景:业务逻辑需要写一些模糊匹配项,比如column1字段包含"abc"的数据,我们经常会这样: column like '%abc%'column regexp 'abc'。假如要包含的不仅仅是abc,可能还有abc1、abc2、….,那我们就需要写很多or like语句,对内存消耗是极大的,那下面这个东西就是就是为了解决这个匹配多词的。

nnnn

一、构造关键词字典数

nnnn
import java.util.*;n/**n* 构造字典树n* @Description: TODOn* @author zhangjinken* @date 2021-03-31 14:13:57n* @version V1.0n*/npublic class Trie n{n   public Node StartNode;//起始状态n   public Map<String, Integer> m_Keywords = new HashMap<String, Integer>();//记录了当前关键词集合n   //初始化Trien   public Trie() n   {n       StartNode = new Node(null);n       StartNode.m_Failure = StartNode;n   }n   // 清除所有Trie节点n   public void closeTrie() n   {n      CleanStates(StartNode);n   }n    // 增加关键词n   public void AddKeyword(String keyword) n   {n      m_Keywords.put(keyword, 0);n      //RebuildTrie();n   }n   // 删除关键词n   public void DeleteKeyword(String keyword) n   {n      m_Keywords.remove(keyword);n      RebuildTrie();n   }n   // 检索字符串text是否包含关键词,返回字符串n   public String Search(String text) n   {n      Node curState = StartNode;n      int i;n      HashSet<String> res=new HashSet<String>();n      // 查看状态机中当前状态下该字符对应的下一状态,如果在当前状态下找不到满足该个字符的状态路线,n      // 则返回到当前状态的失败状态下继续寻找,直到初始状态    n      for (i = 0; i < text.length(); ++i) n      {n//           while (!curState.m_Goto.containsKey(text.charAt(i)) == false)n          while (!curState.m_Goto.containsKey(text.charAt(i)))n           {n               if (curState.m_Failure != StartNode) n               {n                  if (curState == curState.m_Failure) n                  { //陷入死循环了...n                       System.out.println("Trie Failure");n                  break;n                  }n                  curState = curState.m_Failure; // 返回到当前状态的失败状态n               } n               else n               {n                  curState = StartNode;n                  break;n                }n           }n           // 如果当前状态下能找到该字符对应的下一状态,则跳到下一状态m,n           // 如果状态m包含了m_Output,表示匹配到了关键词,具体原因请继续往下看n           if (curState.m_Goto.containsKey(text.charAt(i))) n           {n              curState = curState.m_Goto.get(text.charAt(i));n              if (!curState.m_Output.isEmpty()) n              {n                 for(int j=0;j<curState.m_Output.size();j++)n                {n                      res.add(curState.m_Output.get(j));n                 }n              }n           }n       }n     return res.toString();n }n   // 检索字符串text是否包含关键词,返回HashSetn   public HashSet<String> SearchSet(String text) n   {n      Node curState = StartNode;n      int i;n      HashSet<String> res=new HashSet<String>();n      // 查看状态机中当前状态下该字符对应的下一状态,如果在当前状态下找不到满足该个字符的状态路线,n      // 则返回到当前状态的失败状态下继续寻找,直到初始状态    n      for (i = 0; i < text.length(); ++i) n      {n           while (!curState.m_Goto.containsKey(text.charAt(i)))n           {n               if (curState.m_Failure != StartNode) n               {n                  if (curState == curState.m_Failure) n                  { //陷入死循环了...n                       System.out.println("Trie Failure");n                  break;n                  }n                  curState = curState.m_Failure; // 返回到当前状态的失败状态n               } n               else n               {n                  curState = StartNode;n                  break;n                }n           }n           // 如果当前状态下能找到该字符对应的下一状态,则跳到下一状态m,n           // 如果状态m包含了m_Output,表示匹配到了关键词,具体原因请继续往下看n           if (curState.m_Goto.containsKey(text.charAt(i))) n           {n              curState = curState.m_Goto.get(text.charAt(i));n              if (!curState.m_Output.isEmpty()) n              {n                 for(int j=0;j<curState.m_Output.size();j++)n                {n                      res.add(curState.m_Output.get(j));n                 }n              }n           }n       }n     return res;n }n /**n  * 内部类Node,表示Trie的状态节点n  */n private class Node n {n    public Node(Node parent) n    {n       this.m_Parent = parent;n       this.m_Failure = null;n     }n    // 记录了该状态节点下,字符-->另一个状态的对应关系n    public Map<Character, Node> m_Goto = new HashMap<Character, Node>();n    // 如果该状态下某具体字符找不到对应的下一状态,应该跳转到m_Failure状态继续查找n    public Node m_Failure;n    // 该状态节点的前一个节点n    public Node m_Parent;n    // 记录了到达该节点时,匹配到的关键词n    public List<String> m_Output = new ArrayList<String>();n    // 为当前状态节点添加字符c对应的下一状态n    Node AddGoto(char c) n    {n       if (!m_Goto.containsKey(c)) n       {n          // not in the goto tablen          Node newState = new Node(this);n          m_Goto.put(c, newState);n          return newState;n       } n       else n       {n          return m_Goto.get(c);n       }n    } n };n // 添加关键词到Trie节点n void DoAddWord(String keyword) n {     n    int i;n    Node curState = StartNode;n    for (i = 0; i < keyword.length(); i++) n    {n        curState = curState.AddGoto(keyword.charAt(i));n    }n     curState.m_Output.add(keyword);n }n // 建立Trien public void RebuildTrie() n {n    CleanStates(StartNode);n    StartNode = new Node(null);n    StartNode.m_Failure = StartNode;n    // add all keywordsn    for (String key : m_Keywords.keySet()) n    {n       DoAddWord(key);n    }n    // 为每个状态节点设置失败跳转的状态节点n    DoFailure();n }n // 清除state下的所有状态节点n void CleanStates(Node state) n {n    for (char key : state.m_Goto.keySet()) n    {n        CleanStates(state.m_Goto.get(key));n    }n    state = null;n }n // 为每个状态节点设置失败跳转的状态节点n void DoFailure() n {  n    LinkedList<Node> q = new LinkedList<Node>();n    // 首先设置起始状态下的所有子状态,设置他们的m_Failure为起始状态,并将他们添加到q中n    for (char c : StartNode.m_Goto.keySet()) n    {n       q.add(StartNode.m_Goto.get(c));n       StartNode.m_Goto.get(c).m_Failure = StartNode;n    }n    while (!q.isEmpty()) n    {n       // 获得q的第一个element,并获取它的子节点,为每个子节点设置失败跳转的状态节点n      Node r = q.getFirst();n      Node state;n      q.remove();n      for (char c : r.m_Goto.keySet()) n      {n         q.add(r.m_Goto.get(c));n         // 从父节点的m_Failure(m1)开始,查找包含字符c对应子节点的节点,n         // 如果m1找不到,则到m1的m_Failure查找,依次类推n         state = r.m_Failure;n         while (!state.m_Goto.containsKey(c))n         {n             state = state.m_Failure;n             if (state == StartNode) n             {n                 break;n             }n         }n         // 如果找到了,设置该子节点的m_Failure为找到的目标节点(m2),n         // 并把m2对应的关键词列表添加到该子节点中n         if (state.m_Goto.containsKey(c)) n         {n             r.m_Goto.get(c).m_Failure = state.m_Goto.get(c);n             for (String str : r.m_Goto.get(c).m_Failure.m_Output) n             {n                 r.m_Goto.get(c).m_Output.add(str);      n             }n          }n         else n         { //找不到,设置该子节点的m_Failure为初始节点n             r.m_Goto.get(c).m_Failure = StartNode;n          }n        }n      }n  }n};
nnnn

二、自定义UDF函数

nnnn
import org.apache.hadoop.hive.ql.exec.UDF;nimport sun.rmi.runtime.Log;nimport java.io.BufferedReader;nimport java.io.FileReader;nimport java.io.InputStream;nimport java.io.InputStreamReader;nimport java.util.HashSet;npublic class KeywordsUDF  extends UDFn{n    public static Trie trie;n    public KeywordsUDF() n    {n        trie=new Trie();n        tryn        {n            InputStream is=this.getClass().getResourceAsStream("/resource/dic.txt");n            BufferedReader br=new BufferedReader(new InputStreamReader(is));n            String line=br.readLine();n            int i=0;n            while(null!=line)n            {n                i++;n                trie.AddKeyword(line.trim().replace(" ",""));n                line=br.readLine();n            }n            br.close();n            trie.RebuildTrie();n        }n        catch(Exception e){n            e.printStackTrace();n        }n    }n    public String evaluate(String comment)n    {n        if(comment!=null&&!comment.isEmpty()) {n            HashSet<String> hashSet = new HashSet<>();n            hashSet = trie.SearchSet(comment);n            if (hashSet.size() > 0) {n                return "bad";n            } else return "pass";n        }n        else return "null";n    }n}
nnnn

三、字典文件

nnnn

dic.txt(dic.dic也行),字典里面关键词就是按Windows的回车换行分隔,将字典文件放到项目的resource目录下【没有resource或者不想放那的话,等打包后自己手动把字典后加到对应文件夹下】。
四、准备上传的jar包
把将你的工程【包含字典树类和udf的项目】打成jar包,一定要保证jar包里有你的字典!!!

nnnn

五、上传使用

nnnn

项目内打JAR包,上传HDFS集群

nnnn
ADD jar /home/hadoop_hive/zhangjinke/TestAnalysis-test.jar;nCREATE TEMPORARY FUNCTION is_Get AS 'KeywordsUDF'; -- 这里的KeywordsUDF类我是直接放到了java文件夹下的nselect is_Get('包含关键词的语句'); -- 输出badn-- select is_Get('未包含字典中任何一关键词的字符串'); -- 输出pass
nnnn

六、注意事项

nnnn

udf函数中的路径默认从本地机器读取,hive是分布式的,所以你要把字典【dic.txt】上传到每一个机器上,不易操作
那么我们就直接放到jar包中,使用 InputStream is=this.getClass().getResourceAsStream("/resource/dic.txt");
将字典确保每台机器顺利从有jar包【包里有字典】的机器上拿字典及编译后的字节码文件,这种方法避免了出现找不到文件路径的错误。

n
最后更新于 2023-07-28